Saturday, October 18, 2025

BUILDING AN AGENTIC AI SYSTEM FOR AUTOMATED POWERPOINT CREATION




INTRODUCTION TO AGENTIC AI FOR DOCUMENT CREATION

An Agentic AI system represents a sophisticated approach to artificial intelligence where autonomous agents can perform complex, multi-step tasks with minimal human intervention. In the context of PowerPoint creation, such a system transforms a simple user prompt about a topic into a comprehensive presentation by orchestrating multiple AI components working in concert.

The core concept revolves around creating an intelligent agent that can understand user requirements, conduct research, process information, and generate professional presentations automatically. This system goes beyond simple template filling by incorporating real-time research capabilities, advanced content processing, and intelligent design decisions.

The agent operates through a series of coordinated steps that mirror how a human researcher and presenter would approach the task. It begins by understanding the user's topic, searches for relevant information across the internet, downloads and processes documents, extracts meaningful content, organizes information logically, and finally creates visually appealing slides that follow established UX principles.


SYSTEM ARCHITECTURE OVERVIEW

The architecture of our Agentic AI system follows a modular design where each component has specific responsibilities while maintaining clear interfaces for communication. The system consists of several interconnected modules that work together to transform user input into polished presentations.

At the highest level, we have the Orchestrator Agent that coordinates all activities. This agent receives user prompts and manages the workflow through various specialized components. The Web Search Agent handles internet research and document discovery. The Content Processor extracts and cleans text from downloaded documents. The RAG System provides intelligent information retrieval and synthesis. The GraphRAG component creates knowledge graphs and ontologies. The Visual Processor extracts images and charts using Vision Language Models. Finally, the Presentation Generator creates the actual PowerPoint files.

Each component operates independently but communicates through well-defined APIs. This modular approach allows for easy maintenance, testing, and future enhancements. The system also includes a Configuration Manager that handles user preferences such as PowerPoint themes and presentation styles.

Here's a foundational code example that demonstrates the basic orchestrator structure:

The following code example shows how to implement the main orchestrator class that coordinates all system components. This class serves as the central hub that manages the workflow from user input to final presentation generation.


import asyncio

import logging

from typing import Dict, List, Optional

from dataclasses import dataclass

from pathlib import Path


@dataclass

class UserRequest:

    topic: str

    theme: Optional[str] = None

    max_slides: int = 10

    output_folder: str = "./presentations"

    

class AgenticPowerPointSystem:

    def __init__(self, config_path: str):

        self.config = self.load_configuration(config_path)

        self.web_searcher = WebSearchAgent(self.config)

        self.content_processor = ContentProcessor(self.config)

        self.rag_system = RAGSystem(self.config)

        self.graph_rag = GraphRAGSystem(self.config)

        self.visual_processor = VisualProcessor(self.config)

        self.presentation_generator = PresentationGenerator(self.config)

        self.logger = logging.getLogger(__name__)

        

    async def create_presentation(self, request: UserRequest) -> str:

        """Main orchestration method for presentation creation"""

        self.logger.info(f"Starting presentation creation for topic: {request.topic}")

        

        # Step 1: Search and download documents

        documents = await self.web_searcher.search_and_download(

            request.topic, 

            max_documents=20

        )

        

        # Step 2: Extract and process content

        processed_content = await self.content_processor.process_documents(documents)

        

        # Step 3: Build RAG system with processed content

        await self.rag_system.index_content(processed_content)

        

        # Step 4: Create knowledge graph and ontology

        ontology = await self.graph_rag.create_ontology(processed_content)

        

        # Step 5: Extract visual elements

        visual_elements = await self.visual_processor.extract_visuals(documents)

        

        # Step 6: Generate presentation structure

        presentation_structure = await self.generate_presentation_structure(

            request, processed_content, ontology

        )

        

        # Step 7: Create PowerPoint file

        output_path = await self.presentation_generator.create_presentation(

            presentation_structure, 

            visual_elements, 

            request.theme,

            request.output_folder

        )

        

        self.logger.info(f"Presentation created successfully: {output_path}")

        return output_path


This orchestrator demonstrates how the system coordinates multiple agents to accomplish the complex task of presentation creation. Each method call represents a significant subsystem that we'll explore in detail throughout this article.


WEB SEARCH AND DOCUMENT DISCOVERY COMPONENT

The Web Search Agent serves as the research arm of our system, responsible for finding relevant documents across the internet based on user-specified topics. This component must be intelligent enough to formulate effective search queries, evaluate result relevance, and make decisions about which documents to download for further processing.

The search strategy involves multiple approaches to ensure comprehensive coverage of the topic. The agent starts with direct keyword searches but also employs semantic search techniques to find related concepts and alternative perspectives. It uses various search engines and academic databases to gather diverse sources of information.

The agent implements a scoring system to evaluate document relevance based on multiple factors including title relevance, content preview analysis, source credibility, and document type. This scoring helps prioritize which documents to download when dealing with large result sets.

Here's a detailed implementation of the web search component:

The following code example demonstrates how to implement a sophisticated web search agent that can discover and evaluate relevant documents. This implementation includes multiple search strategies and relevance scoring to ensure high-quality document selection.


import aiohttp

import asyncio

from bs4 import BeautifulSoup

from urllib.parse import urljoin, urlparse

import hashlib

from typing import List, Dict, Tuple

import re


class DocumentMetadata:

    def __init__(self, url: str, title: str, content_type: str, relevance_score: float):

        self.url = url

        self.title = title

        self.content_type = content_type

        self.relevance_score = relevance_score

        self.local_path = None

        self.download_timestamp = None


class WebSearchAgent:

    def __init__(self, config: Dict):

        self.config = config

        self.session = None

        self.downloaded_documents = []

        self.search_engines = {

            'google': self.search_google,

            'bing': self.search_bing,

            'academic': self.search_academic_sources

        }

        

    async def search_and_download(self, topic: str, max_documents: int = 20) -> List[DocumentMetadata]:

        """Main method to search for and download relevant documents"""

        async with aiohttp.ClientSession() as session:

            self.session = session

            

            # Generate search queries with different strategies

            search_queries = self.generate_search_queries(topic)

            

            # Search across multiple engines

            all_results = []

            for query in search_queries:

                for engine_name, search_func in self.search_engines.items():

                    try:

                        results = await search_func(query)

                        all_results.extend(results)

                    except Exception as e:

                        print(f"Error searching with {engine_name}: {e}")

            

            # Remove duplicates and score results

            unique_results = self.deduplicate_results(all_results)

            scored_results = self.score_relevance(unique_results, topic)

            

            # Select top results for download

            selected_results = sorted(scored_results, 

                                    key=lambda x: x.relevance_score, 

                                    reverse=True)[:max_documents]

            

            # Download selected documents

            downloaded_docs = []

            for result in selected_results:

                try:

                    doc = await self.download_document(result)

                    if doc:

                        downloaded_docs.append(doc)

                except Exception as e:

                    print(f"Error downloading {result.url}: {e}")

            

            return downloaded_docs

    

    def generate_search_queries(self, topic: str) -> List[str]:

        """Generate multiple search query variations for comprehensive coverage"""

        base_queries = [

            topic,

            f"{topic} overview",

            f"{topic} introduction",

            f"{topic} fundamentals",

            f"{topic} guide",

            f"{topic} tutorial",

            f"{topic} research",

            f"{topic} analysis"

        ]

        

        # Add domain-specific variations

        domain_variations = [

            f"{topic} PDF",

            f"{topic} whitepaper",

            f"{topic} documentation",

            f"{topic} case study"

        ]

        

        return base_queries + domain_variations

    

    async def search_google(self, query: str) -> List[DocumentMetadata]:

        """Search Google for relevant documents"""

        # Note: In a real implementation, you would use Google Custom Search API

        # This is a simplified example showing the structure

        search_url = f"https://www.google.com/search?q={query}+filetype:pdf OR filetype:html"

        

        try:

            async with self.session.get(search_url, headers=self.get_headers()) as response:

                html = await response.text()

                soup = BeautifulSoup(html, 'html.parser')

                

                results = []

                for result_div in soup.find_all('div', class_='g'):

                    title_elem = result_div.find('h3')

                    link_elem = result_div.find('a')

                    

                    if title_elem and link_elem:

                        title = title_elem.get_text()

                        url = link_elem.get('href')

                        

                        if url and url.startswith('http'):

                            content_type = self.detect_content_type(url)

                            if content_type in ['pdf', 'html']:

                                doc = DocumentMetadata(url, title, content_type, 0.0)

                                results.append(doc)

                

                return results

        except Exception as e:

            print(f"Google search error: {e}")

            return []

    

    def score_relevance(self, documents: List[DocumentMetadata], topic: str) -> List[DocumentMetadata]:

        """Score document relevance based on multiple factors"""

        topic_keywords = topic.lower().split()

        

        for doc in documents:

            score = 0.0

            title_lower = doc.title.lower()

            

            # Title relevance scoring

            for keyword in topic_keywords:

                if keyword in title_lower:

                    score += 2.0

                elif any(keyword in word for word in title_lower.split()):

                    score += 1.0

            

            # Content type preference

            if doc.content_type == 'pdf':

                score += 1.5  # PDFs often contain more comprehensive content

            elif doc.content_type == 'html':

                score += 1.0

            

            # URL structure analysis

            url_lower = doc.url.lower()

            if any(indicator in url_lower for indicator in ['research', 'academic', 'paper', 'study']):

                score += 1.0

            

            doc.relevance_score = score

        

        return documents


This web search implementation demonstrates how to create a comprehensive document discovery system that can intelligently find and evaluate relevant content across the internet. The scoring system ensures that the most relevant documents are prioritized for download and processing.


DOCUMENT DOWNLOAD AND STORAGE MANAGEMENT

Once relevant documents are identified, the system must efficiently download and store them in a local folder structure. This component handles various file types, manages download failures, implements retry mechanisms, and organizes files in a logical directory structure that facilitates later processing.

The download manager must handle different content types gracefully, including PDF documents that may require special handling and HTML pages that need to be saved with their associated resources. It also implements intelligent naming schemes to avoid conflicts and ensure easy identification of downloaded content.

Storage management includes creating appropriate folder structures, handling file naming conflicts, and maintaining metadata about downloaded documents. The system also implements cleanup mechanisms to manage disk space and remove outdated or irrelevant files.

Here's a comprehensive implementation of the document download and storage system:

The following code example shows how to implement a robust document download and storage manager that handles various file types, implements retry mechanisms, and maintains organized file structures. This component ensures reliable document acquisition and storage for subsequent processing.


import aiofiles

import aiohttp

from pathlib import Path

import hashlib

import json

from datetime import datetime

import mimetypes

from urllib.parse import urlparse

import asyncio


class DocumentDownloader:

    def __init__(self, base_storage_path: str = "./downloaded_documents"):

        self.base_storage_path = Path(base_storage_path)

        self.base_storage_path.mkdir(exist_ok=True)

        self.metadata_file = self.base_storage_path / "download_metadata.json"

        self.download_metadata = self.load_metadata()

        

    async def download_document(self, doc_metadata: DocumentMetadata) -> Optional[DocumentMetadata]:

        """Download a single document with retry logic and proper storage"""

        

        # Check if already downloaded

        url_hash = self.generate_url_hash(doc_metadata.url)

        if url_hash in self.download_metadata:

            existing_path = Path(self.download_metadata[url_hash]['local_path'])

            if existing_path.exists():

                doc_metadata.local_path = str(existing_path)

                return doc_metadata

        

        # Create session-specific folder

        session_folder = self.create_session_folder()

        

        # Attempt download with retries

        max_retries = 3

        for attempt in range(max_retries):

            try:

                success = await self.attempt_download(doc_metadata, session_folder)

                if success:

                    self.update_metadata(doc_metadata, url_hash)

                    return doc_metadata

            except Exception as e:

                print(f"Download attempt {attempt + 1} failed for {doc_metadata.url}: {e}")

                if attempt == max_retries - 1:

                    print(f"Failed to download after {max_retries} attempts: {doc_metadata.url}")

                else:

                    await asyncio.sleep(2 ** attempt)  # Exponential backoff

        

        return None

    

    async def attempt_download(self, doc_metadata: DocumentMetadata, session_folder: Path) -> bool:

        """Attempt to download a single document"""

        

        headers = {

            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',

            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',

            'Accept-Language': 'en-US,en;q=0.5',

            'Accept-Encoding': 'gzip, deflate',

            'Connection': 'keep-alive',

        }

        

        timeout = aiohttp.ClientTimeout(total=30)

        

        async with aiohttp.ClientSession(timeout=timeout) as session:

            async with session.get(doc_metadata.url, headers=headers) as response:

                if response.status == 200:

                    content = await response.read()

                    

                    # Determine file extension and name

                    file_extension = self.determine_file_extension(doc_metadata, response)

                    safe_filename = self.create_safe_filename(doc_metadata.title, file_extension)

                    file_path = session_folder / safe_filename

                    

                    # Ensure unique filename

                    file_path = self.ensure_unique_filename(file_path)

                    

                    # Write file

                    async with aiofiles.open(file_path, 'wb') as f:

                        await f.write(content)

                    

                    # Update document metadata

                    doc_metadata.local_path = str(file_path)

                    doc_metadata.download_timestamp = datetime.now().isoformat()

                    

                    # Validate download

                    if await self.validate_download(file_path, doc_metadata.content_type):

                        print(f"Successfully downloaded: {file_path}")

                        return True

                    else:

                        file_path.unlink()  # Remove invalid file

                        return False

                else:

                    print(f"HTTP {response.status} for {doc_metadata.url}")

                    return False

    

    def create_session_folder(self) -> Path:

        """Create a timestamped folder for this download session"""

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

        session_folder = self.base_storage_path / f"session_{timestamp}"

        session_folder.mkdir(exist_ok=True)

        return session_folder

    

    def create_safe_filename(self, title: str, extension: str) -> str:

        """Create a safe filename from document title"""

        # Remove or replace unsafe characters

        safe_title = re.sub(r'[<>:"/\\|?*]', '_', title)

        safe_title = re.sub(r'\s+', '_', safe_title)

        safe_title = safe_title[:100]  # Limit length

        

        if not safe_title:

            safe_title = "document"

        

        return f"{safe_title}.{extension}"

    

    def determine_file_extension(self, doc_metadata: DocumentMetadata, response) -> str:

        """Determine appropriate file extension"""

        

        # First try content type from response headers

        content_type = response.headers.get('content-type', '').lower()

        if 'pdf' in content_type:

            return 'pdf'

        elif 'html' in content_type:

            return 'html'

        

        # Try URL extension

        parsed_url = urlparse(doc_metadata.url)

        path_extension = Path(parsed_url.path).suffix.lower()

        if path_extension in ['.pdf', '.html', '.htm']:

            return path_extension[1:]  # Remove the dot

        

        # Default based on detected content type

        if doc_metadata.content_type == 'pdf':

            return 'pdf'

        else:

            return 'html'

    

    async def validate_download(self, file_path: Path, expected_type: str) -> bool:

        """Validate that downloaded file is of expected type"""

        

        if not file_path.exists() or file_path.stat().st_size == 0:

            return False

        

        # Read first few bytes to check file signature

        async with aiofiles.open(file_path, 'rb') as f:

            header = await f.read(8)

        

        if expected_type == 'pdf':

            return header.startswith(b'%PDF')

        elif expected_type == 'html':

            # For HTML, check if it contains HTML tags

            try:

                async with aiofiles.open(file_path, 'r', encoding='utf-8', errors='ignore') as f:

                    content = await f.read(1000)

                return '<html' in content.lower() or '<!doctype html' in content.lower()

            except:

                return False

        

        return True

    

    def update_metadata(self, doc_metadata: DocumentMetadata, url_hash: str):

        """Update download metadata"""

        self.download_metadata[url_hash] = {

            'url': doc_metadata.url,

            'title': doc_metadata.title,

            'local_path': doc_metadata.local_path,

            'download_timestamp': doc_metadata.download_timestamp,

            'content_type': doc_metadata.content_type,

            'relevance_score': doc_metadata.relevance_score

        }

        

        # Save metadata to file

        with open(self.metadata_file, 'w') as f:

            json.dump(self.download_metadata, f, indent=2)


This download and storage system provides robust handling of document acquisition with proper error handling, retry mechanisms, and organized file management. The metadata tracking ensures that the system can efficiently manage downloaded content and avoid unnecessary re-downloads.


CONTENT EXTRACTION FROM PDF AND HTML DOCUMENTS

After documents are successfully downloaded, the system must extract meaningful text content from various file formats. This component handles the complexity of parsing PDF documents and HTML pages, cleaning extracted text, and preparing content for further processing by the RAG system.

PDF extraction requires sophisticated handling of different PDF structures, including scanned documents that may require OCR processing, multi-column layouts, and documents with embedded images and tables. The system must preserve important structural information while extracting clean, readable text.

HTML processing involves parsing web page structures, removing navigation elements and advertisements, extracting main content areas, and handling various encoding issues. The processor must be intelligent enough to identify and extract the primary content while discarding irrelevant page elements.

Here's a comprehensive implementation of the content extraction system:

The following code example demonstrates how to implement a robust content extraction system that handles both PDF and HTML documents. This implementation includes advanced text cleaning, structure preservation, and quality validation to ensure high-quality content extraction for subsequent processing.


import PyPDF2

import pdfplumber

from bs4 import BeautifulSoup

import re

from typing import Dict, List, Optional

import aiofiles

from dataclasses import dataclass

import asyncio

from pathlib import Path


@dataclass

class ExtractedContent:

    source_url: str

    source_file: str

    title: str

    content_type: str

    raw_text: str

    cleaned_text: str

    metadata: Dict

    extraction_quality_score: float


class ContentProcessor:

    def __init__(self, config: Dict):

        self.config = config

        self.min_content_length = config.get('min_content_length', 500)

        self.max_content_length = config.get('max_content_length', 50000)

        

    async def process_documents(self, documents: List[DocumentMetadata]) -> List[ExtractedContent]:

        """Process all downloaded documents and extract content"""

        

        extracted_contents = []

        

        for doc in documents:

            if not doc.local_path or not Path(doc.local_path).exists():

                continue

                

            try:

                if doc.content_type == 'pdf':

                    content = await self.extract_pdf_content(doc)

                elif doc.content_type == 'html':

                    content = await self.extract_html_content(doc)

                else:

                    continue

                

                if content and self.validate_content_quality(content):

                    extracted_contents.append(content)

                    

            except Exception as e:

                print(f"Error processing {doc.local_path}: {e}")

        

        return extracted_contents

    

    async def extract_pdf_content(self, doc: DocumentMetadata) -> Optional[ExtractedContent]:

        """Extract content from PDF documents using multiple methods"""

        

        file_path = Path(doc.local_path)

        

        # Try pdfplumber first (better for complex layouts)

        try:

            content = await self.extract_pdf_with_pdfplumber(file_path)

            if content and len(content.strip()) > self.min_content_length:

                return self.create_extracted_content(doc, content, 'pdfplumber')

        except Exception as e:

            print(f"pdfplumber extraction failed for {file_path}: {e}")

        

        # Fallback to PyPDF2

        try:

            content = await self.extract_pdf_with_pypdf2(file_path)

            if content and len(content.strip()) > self.min_content_length:

                return self.create_extracted_content(doc, content, 'pypdf2')

        except Exception as e:

            print(f"PyPDF2 extraction failed for {file_path}: {e}")

        

        return None

    

    async def extract_pdf_with_pdfplumber(self, file_path: Path) -> str:

        """Extract PDF content using pdfplumber for better layout handling"""

        

        extracted_text = []

        

        with pdfplumber.open(file_path) as pdf:

            for page_num, page in enumerate(pdf.pages):

                try:

                    # Extract text with layout preservation

                    text = page.extract_text(layout=True)

                    if text:

                        # Clean up the text while preserving structure

                        cleaned_text = self.clean_pdf_text(text)

                        if cleaned_text.strip():

                            extracted_text.append(f"--- Page {page_num + 1} ---\n{cleaned_text}")

                            

                    # Also extract tables if present

                    tables = page.extract_tables()

                    for table_num, table in enumerate(tables):

                        if table:

                            table_text = self.format_table_as_text(table)

                            extracted_text.append(f"--- Table {table_num + 1} on Page {page_num + 1} ---\n{table_text}")

                            

                except Exception as e:

                    print(f"Error extracting page {page_num + 1}: {e}")

                    continue

        

        return "\n\n".join(extracted_text)

    

    async def extract_pdf_with_pypdf2(self, file_path: Path) -> str:

        """Fallback PDF extraction using PyPDF2"""

        

        extracted_text = []

        

        with open(file_path, 'rb') as file:

            pdf_reader = PyPDF2.PdfReader(file)

            

            for page_num, page in enumerate(pdf_reader.pages):

                try:

                    text = page.extract_text()

                    if text:

                        cleaned_text = self.clean_pdf_text(text)

                        if cleaned_text.strip():

                            extracted_text.append(f"--- Page {page_num + 1} ---\n{cleaned_text}")

                except Exception as e:

                    print(f"Error extracting page {page_num + 1}: {e}")

                    continue

        

        return "\n\n".join(extracted_text)

    

    def clean_pdf_text(self, text: str) -> str:

        """Clean extracted PDF text while preserving important structure"""

        

        # Remove excessive whitespace but preserve paragraph breaks

        text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)

        

        # Fix common PDF extraction issues

        text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text)  # Add space between words

        text = re.sub(r'(\w)-\s*\n\s*(\w)', r'\1\2', text)  # Fix hyphenated words

        text = re.sub(r'\s+', ' ', text)  # Normalize whitespace

        

        # Remove page headers/footers (common patterns)

        lines = text.split('\n')

        cleaned_lines = []

        

        for line in lines:

            line = line.strip()

            # Skip likely headers/footers

            if (len(line) < 10 or 

                re.match(r'^\d+$', line) or  # Page numbers

                re.match(r'^Page \d+', line) or

                line.lower().startswith('copyright') or

                line.count('.') > len(line) / 3):  # Likely table of contents

                continue

            cleaned_lines.append(line)

        

        return '\n'.join(cleaned_lines)

    

    def format_table_as_text(self, table: List[List[str]]) -> str:

        """Convert extracted table to readable text format"""

        

        if not table:

            return ""

        

        formatted_rows = []

        for row in table:

            if row and any(cell for cell in row if cell):  # Skip empty rows

                clean_row = [str(cell).strip() if cell else "" for cell in row]

                formatted_rows.append(" | ".join(clean_row))

        

        return "\n".join(formatted_rows)

    

    async def extract_html_content(self, doc: DocumentMetadata) -> Optional[ExtractedContent]:

        """Extract content from HTML documents"""

        

        file_path = Path(doc.local_path)

        

        try:

            async with aiofiles.open(file_path, 'r', encoding='utf-8', errors='ignore') as f:

                html_content = await f.read()

            

            soup = BeautifulSoup(html_content, 'html.parser')

            

            # Remove unwanted elements

            self.remove_unwanted_elements(soup)

            

            # Extract main content

            main_content = self.extract_main_content(soup)

            

            if main_content and len(main_content.strip()) > self.min_content_length:

                return self.create_extracted_content(doc, main_content, 'beautifulsoup')

            

        except Exception as e:

            print(f"HTML extraction failed for {file_path}: {e}")

        

        return None

    

    def remove_unwanted_elements(self, soup: BeautifulSoup):

        """Remove navigation, ads, and other non-content elements"""

        

        # Remove script and style elements

        for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside']):

            element.decompose()

        

        # Remove elements with common non-content classes/ids

        unwanted_selectors = [

            '[class*="nav"]', '[class*="menu"]', '[class*="sidebar"]',

            '[class*="ad"]', '[class*="advertisement"]', '[class*="banner"]',

            '[id*="nav"]', '[id*="menu"]', '[id*="sidebar"]',

            '[id*="ad"]', '[id*="advertisement"]', '[id*="banner"]'

        ]

        

        for selector in unwanted_selectors:

            for element in soup.select(selector):

                element.decompose()

    

    def extract_main_content(self, soup: BeautifulSoup) -> str:

        """Extract main content from cleaned HTML"""

        

        # Try to find main content area

        main_selectors = [

            'main', 'article', '[role="main"]',

            '.content', '.main-content', '.article-content',

            '#content', '#main-content', '#article-content'

        ]

        

        for selector in main_selectors:

            main_element = soup.select_one(selector)

            if main_element:

                return self.clean_html_text(main_element.get_text())

        

        # Fallback: extract from body

        body = soup.find('body')

        if body:

            return self.clean_html_text(body.get_text())

        

        # Last resort: entire document

        return self.clean_html_text(soup.get_text())

    

    def clean_html_text(self, text: str) -> str:

        """Clean extracted HTML text"""

        

        # Normalize whitespace

        text = re.sub(r'\s+', ' ', text)

        text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text)

        

        # Remove excessive punctuation

        text = re.sub(r'[.]{3,}', '...', text)

        text = re.sub(r'[-]{3,}', '---', text)

        

        return text.strip()

    

    def create_extracted_content(self, doc: DocumentMetadata, text: str, extraction_method: str) -> ExtractedContent:

        """Create ExtractedContent object with quality scoring"""

        

        cleaned_text = self.final_text_cleanup(text)

        quality_score = self.calculate_quality_score(cleaned_text)

        

        metadata = {

            'extraction_method': extraction_method,

            'original_length': len(text),

            'cleaned_length': len(cleaned_text),

            'source_type': doc.content_type

        }

        

        return ExtractedContent(

            source_url=doc.url,

            source_file=doc.local_path,

            title=doc.title,

            content_type=doc.content_type,

            raw_text=text,

            cleaned_text=cleaned_text,

            metadata=metadata,

            extraction_quality_score=quality_score

        )

    

    def final_text_cleanup(self, text: str) -> str:

        """Final cleanup of extracted text"""

        

        # Remove very short lines (likely artifacts)

        lines = text.split('\n')

        cleaned_lines = [line for line in lines if len(line.strip()) > 10 or line.strip() == '']

        

        # Rejoin and normalize

        text = '\n'.join(cleaned_lines)

        text = re.sub(r'\n{3,}', '\n\n', text)

        

        return text.strip()

    

    def calculate_quality_score(self, text: str) -> float:

        """Calculate content quality score based on various factors"""

        

        if not text:

            return 0.0

        

        score = 0.0

        

        # Length factor

        length = len(text)

        if length > self.min_content_length:

            score += min(1.0, length / 5000)  # Max 1.0 for 5000+ chars

        

        # Sentence structure

        sentences = re.split(r'[.!?]+', text)

        avg_sentence_length = sum(len(s.split()) for s in sentences) / max(len(sentences), 1)

        if 10 <= avg_sentence_length <= 30:  # Good sentence length

            score += 0.5

        

        # Vocabulary diversity

        words = re.findall(r'\b\w+\b', text.lower())

        unique_words = set(words)

        if words:

            diversity = len(unique_words) / len(words)

            score += min(0.5, diversity * 2)  # Max 0.5 for high diversity

        

        return min(1.0, score)

    

    def validate_content_quality(self, content: ExtractedContent) -> bool:

        """Validate if extracted content meets quality standards"""

        

        return (content.extraction_quality_score >= 0.3 and

                len(content.cleaned_text) >= self.min_content_length and

                len(content.cleaned_text) <= self.max_content_length)


This content extraction system provides comprehensive handling of both PDF and HTML documents with sophisticated text cleaning and quality validation. The multi-method approach ensures maximum content recovery while maintaining text quality for subsequent processing steps.


RAG SYSTEM IMPLEMENTATION

The Retrieval-Augmented Generation system forms the core intelligence of our presentation creation agent. This component indexes the extracted content, enables semantic search capabilities, and provides contextually relevant information retrieval for slide generation. The RAG system must efficiently handle large volumes of text while maintaining fast query response times.

The implementation involves creating vector embeddings of the extracted content, building efficient search indices, and implementing sophisticated retrieval strategies that can find relevant information based on semantic similarity rather than just keyword matching. The system also needs to handle content chunking to ensure optimal retrieval granularity.

The RAG system maintains context awareness across multiple documents and can synthesize information from various sources to provide comprehensive answers to queries about the presentation topic. It implements advanced ranking algorithms to ensure the most relevant content is prioritized in retrieval results.

Here's a detailed implementation of the RAG system:

The following code example demonstrates how to implement a sophisticated RAG system that can index extracted content, perform semantic search, and provide contextually relevant information retrieval. This implementation includes advanced chunking strategies, embedding generation, and retrieval optimization for presentation content generation.


import numpy as np

from sentence_transformers import SentenceTransformer

import faiss

from typing import List, Dict, Tuple, Optional

import pickle

import json

from dataclasses import dataclass, asdict

import re

from pathlib import Path

import asyncio


@dataclass

class ContentChunk:

    chunk_id: str

    source_url: str

    source_file: str

    content: str

    chunk_index: int

    total_chunks: int

    embedding: Optional[np.ndarray] = None

    metadata: Dict = None


@dataclass

class RetrievalResult:

    chunk: ContentChunk

    similarity_score: float

    relevance_rank: int


class RAGSystem:

    def __init__(self, config: Dict):

        self.config = config

        self.embedding_model_name = config.get('embedding_model', 'all-MiniLM-L6-v2')

        self.chunk_size = config.get('chunk_size', 512)

        self.chunk_overlap = config.get('chunk_overlap', 50)

        self.max_chunks_per_query = config.get('max_chunks_per_query', 10)

        

        # Initialize embedding model

        self.embedding_model = SentenceTransformer(self.embedding_model_name)

        self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()

        

        # Initialize FAISS index

        self.index = faiss.IndexFlatIP(self.embedding_dimension)  # Inner product for cosine similarity

        self.chunks = []

        self.chunk_id_to_index = {}

        

        # Storage paths

        self.storage_path = Path(config.get('rag_storage_path', './rag_storage'))

        self.storage_path.mkdir(exist_ok=True)

        

    async def index_content(self, extracted_contents: List[ExtractedContent]):

        """Index all extracted content for retrieval"""

        

        print("Starting content indexing...")

        

        # Clear existing index

        self.index.reset()

        self.chunks.clear()

        self.chunk_id_to_index.clear()

        

        all_chunks = []

        

        # Process each document

        for content in extracted_contents:

            document_chunks = await self.create_chunks(content)

            all_chunks.extend(document_chunks)

        

        # Generate embeddings for all chunks

        print(f"Generating embeddings for {len(all_chunks)} chunks...")

        chunk_texts = [chunk.content for chunk in all_chunks]

        embeddings = self.embedding_model.encode(chunk_texts, show_progress_bar=True)

        

        # Normalize embeddings for cosine similarity

        embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

        

        # Add to FAISS index

        self.index.add(embeddings.astype('float32'))

        

        # Store chunks with embeddings

        for i, (chunk, embedding) in enumerate(zip(all_chunks, embeddings)):

            chunk.embedding = embedding

            self.chunks.append(chunk)

            self.chunk_id_to_index[chunk.chunk_id] = i

        

        # Save index and chunks

        await self.save_index()

        

        print(f"Successfully indexed {len(all_chunks)} chunks from {len(extracted_contents)} documents")

    

    async def create_chunks(self, content: ExtractedContent) -> List[ContentChunk]:

        """Create overlapping chunks from extracted content"""

        

        text = content.cleaned_text

        

        # Split into sentences for better chunk boundaries

        sentences = self.split_into_sentences(text)

        

        chunks = []

        current_chunk = []

        current_length = 0

        chunk_index = 0

        

        for sentence in sentences:

            sentence_length = len(sentence.split())

            

            # Check if adding this sentence would exceed chunk size

            if current_length + sentence_length > self.chunk_size and current_chunk:

                # Create chunk from current sentences

                chunk_text = ' '.join(current_chunk)

                chunk = self.create_chunk_object(content, chunk_text, chunk_index)

                chunks.append(chunk)

                

                # Start new chunk with overlap

                overlap_sentences = self.get_overlap_sentences(current_chunk)

                current_chunk = overlap_sentences + [sentence]

                current_length = sum(len(s.split()) for s in current_chunk)

                chunk_index += 1

            else:

                current_chunk.append(sentence)

                current_length += sentence_length

        

        # Add final chunk if it has content

        if current_chunk:

            chunk_text = ' '.join(current_chunk)

            chunk = self.create_chunk_object(content, chunk_text, chunk_index)

            chunks.append(chunk)

        

        # Update total chunks count

        for chunk in chunks:

            chunk.total_chunks = len(chunks)

        

        return chunks

    

    def split_into_sentences(self, text: str) -> List[str]:

        """Split text into sentences using regex"""

        

        # Split on sentence endings, but preserve the punctuation

        sentences = re.split(r'(?<=[.!?])\s+', text)

        

        # Filter out very short sentences and clean up

        cleaned_sentences = []

        for sentence in sentences:

            sentence = sentence.strip()

            if len(sentence) > 10:  # Minimum sentence length

                cleaned_sentences.append(sentence)

        

        return cleaned_sentences

    

    def get_overlap_sentences(self, sentences: List[str]) -> List[str]:

        """Get sentences for overlap between chunks"""

        

        overlap_words = 0

        overlap_sentences = []

        

        # Take sentences from the end until we reach overlap word count

        for sentence in reversed(sentences):

            word_count = len(sentence.split())

            if overlap_words + word_count <= self.chunk_overlap:

                overlap_sentences.insert(0, sentence)

                overlap_words += word_count

            else:

                break

        

        return overlap_sentences

    

    def create_chunk_object(self, content: ExtractedContent, chunk_text: str, chunk_index: int) -> ContentChunk:

        """Create a ContentChunk object"""

        

        chunk_id = f"{hash(content.source_url)}_{chunk_index}"

        

        metadata = {

            'source_title': content.title,

            'source_type': content.content_type,

            'extraction_quality': content.extraction_quality_score,

            'chunk_word_count': len(chunk_text.split())

        }

        

        return ContentChunk(

            chunk_id=chunk_id,

            source_url=content.source_url,

            source_file=content.source_file,

            content=chunk_text,

            chunk_index=chunk_index,

            total_chunks=0,  # Will be updated later

            metadata=metadata

        )

    

    async def retrieve_relevant_content(self, query: str, max_results: int = None) -> List[RetrievalResult]:

        """Retrieve relevant content chunks for a given query"""

        

        if max_results is None:

            max_results = self.max_chunks_per_query

        

        if not self.chunks:

            print("No content indexed. Please index content first.")

            return []

        

        # Generate query embedding

        query_embedding = self.embedding_model.encode([query])

        query_embedding = query_embedding / np.linalg.norm(query_embedding, axis=1, keepdims=True)

        

        # Search in FAISS index

        similarities, indices = self.index.search(query_embedding.astype('float32'), max_results)

        

        # Create retrieval results

        results = []

        for rank, (similarity, index) in enumerate(zip(similarities[0], indices[0])):

            if index < len(self.chunks):  # Valid index

                chunk = self.chunks[index]

                result = RetrievalResult(

                    chunk=chunk,

                    similarity_score=float(similarity),

                    relevance_rank=rank

                )

                results.append(result)

        

        # Apply additional ranking based on content quality and diversity

        results = self.rerank_results(results, query)

        

        return results

    

    def rerank_results(self, results: List[RetrievalResult], query: str) -> List[RetrievalResult]:

        """Apply additional ranking to improve result quality and diversity"""

        

        # Calculate additional relevance factors

        for result in results:

            chunk = result.chunk

            

            # Keyword overlap bonus

            query_words = set(query.lower().split())

            chunk_words = set(chunk.content.lower().split())

            keyword_overlap = len(query_words.intersection(chunk_words)) / len(query_words)

            

            # Content quality bonus

            quality_bonus = chunk.metadata.get('extraction_quality', 0.5)

            

            # Combine scores

            combined_score = (result.similarity_score * 0.7 + 

                            keyword_overlap * 0.2 + 

                            quality_bonus * 0.1)

            

            result.similarity_score = combined_score

        

        # Re-sort by combined score

        results.sort(key=lambda x: x.similarity_score, reverse=True)

        

        # Update ranks

        for i, result in enumerate(results):

            result.relevance_rank = i

        

        # Apply diversity filtering to avoid too many chunks from same source

        diverse_results = self.apply_diversity_filter(results)

        

        return diverse_results

    

    def apply_diversity_filter(self, results: List[RetrievalResult], max_per_source: int = 3) -> List[RetrievalResult]:

        """Filter results to ensure diversity across sources"""

        

        source_counts = {}

        filtered_results = []

        

        for result in results:

            source_url = result.chunk.source_url

            current_count = source_counts.get(source_url, 0)

            

            if current_count < max_per_source:

                filtered_results.append(result)

                source_counts[source_url] = current_count + 1

        

        return filtered_results

    

    async def generate_context_for_slide(self, slide_topic: str, slide_context: str = "") -> str:

        """Generate contextual information for a specific slide topic"""

        

        # Combine slide topic with any additional context

        query = f"{slide_topic} {slide_context}".strip()

        

        # Retrieve relevant content

        results = await self.retrieve_relevant_content(query, max_results=5)

        

        if not results:

            return f"No relevant content found for: {slide_topic}"

        

        # Synthesize content from top results

        context_parts = []

        for result in results:

            chunk = result.chunk

            context_parts.append(f"From {chunk.metadata['source_title']}:\n{chunk.content}")

        

        synthesized_context = "\n\n---\n\n".join(context_parts)

        

        return synthesized_context

    

    async def save_index(self):

        """Save FAISS index and chunks to disk"""

        

        # Save FAISS index

        index_path = self.storage_path / "faiss_index.bin"

        faiss.write_index(self.index, str(index_path))

        

        # Save chunks (without embeddings to save space)

        chunks_data = []

        for chunk in self.chunks:

            chunk_dict = asdict(chunk)

            chunk_dict['embedding'] = None  # Don't save embeddings separately

            chunks_data.append(chunk_dict)

        

        chunks_path = self.storage_path / "chunks.json"

        with open(chunks_path, 'w') as f:

            json.dump(chunks_data, f, indent=2)

        

        # Save metadata

        metadata = {

            'embedding_model': self.embedding_model_name,

            'embedding_dimension': self.embedding_dimension,

            'chunk_count': len(self.chunks),

            'chunk_size': self.chunk_size,

            'chunk_overlap': self.chunk_overlap

        }

        

        metadata_path = self.storage_path / "index_metadata.json"

        with open(metadata_path, 'w') as f:

            json.dump(metadata, f, indent=2)

        

        print(f"Index saved to {self.storage_path}")

    

    async def load_index(self) -> bool:

        """Load FAISS index and chunks from disk"""

        

        try:

            # Load FAISS index

            index_path = self.storage_path / "faiss_index.bin"

            if not index_path.exists():

                return False

            

            self.index = faiss.read_index(str(index_path))

            

            # Load chunks

            chunks_path = self.storage_path / "chunks.json"

            with open(chunks_path, 'r') as f:

                chunks_data = json.load(f)

            

            self.chunks = []

            self.chunk_id_to_index = {}

            

            for i, chunk_dict in enumerate(chunks_data):

                chunk = ContentChunk(**chunk_dict)

                self.chunks.append(chunk)

                self.chunk_id_to_index[chunk.chunk_id] = i

            

            print(f"Loaded index with {len(self.chunks)} chunks")

            return True

            

        except Exception as e:

            print(f"Error loading index: {e}")

            return False


This RAG system implementation provides sophisticated content indexing and retrieval capabilities with semantic search, intelligent chunking, and advanced ranking algorithms. The system ensures that the most relevant and high-quality content is available for presentation generation while maintaining fast query performance.


GRAPHRAG AND ONTOLOGY CREATION

The GraphRAG component extends traditional RAG capabilities by creating knowledge graphs and ontologies from the extracted content. This system identifies entities, relationships, and concepts within the documents, building a structured representation of knowledge that enables more sophisticated reasoning and content organization for presentation creation.

The ontology creation process involves named entity recognition, relationship extraction, and concept clustering to build a comprehensive knowledge graph. This structured representation allows the system to understand connections between different concepts and generate more coherent and logically organized presentations.

The GraphRAG system also implements graph-based retrieval algorithms that can traverse relationships to find related concepts and supporting information. This capability enables the creation of presentations with better narrative flow and logical progression between slides.

Here's a comprehensive implementation of the GraphRAG and ontology system:

The following code example demonstrates how to implement a sophisticated GraphRAG system that creates knowledge graphs and ontologies from extracted content. This implementation includes entity recognition, relationship extraction, and graph-based retrieval for enhanced presentation generation capabilities.


import spacy

import networkx as nx

from collections import defaultdict, Counter

import json

from typing import Dict, List, Tuple, Set, Optional

from dataclasses import dataclass, asdict

import numpy as np

from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.cluster import KMeans

from sklearn.metrics.pairwise import cosine_similarity

import pickle


@dataclass

class Entity:

    text: str

    label: str

    confidence: float

    mentions: List[str]

    context_chunks: List[str]


@dataclass

class Relationship:

    source_entity: str

    target_entity: str

    relation_type: str

    confidence: float

    evidence_text: str

    source_chunk: str


@dataclass

class Concept:

    name: str

    keywords: List[str]

    related_entities: List[str]

    importance_score: float

    chunk_ids: List[str]


class GraphRAGSystem:

    def __init__(self, config: Dict):

        self.config = config

        self.nlp = spacy.load("en_core_web_sm")

        

        # Knowledge graph

        self.knowledge_graph = nx.DiGraph()

        self.entities = {}

        self.relationships = []

        self.concepts = {}

        

        # Configuration parameters

        self.min_entity_confidence = config.get('min_entity_confidence', 0.7)

        self.max_entities_per_chunk = config.get('max_entities_per_chunk', 20)

        self.concept_cluster_count = config.get('concept_cluster_count', 10)

        

        # Storage

        self.storage_path = Path(config.get('graphrag_storage_path', './graphrag_storage'))

        self.storage_path.mkdir(exist_ok=True)

    

    async def create_ontology(self, extracted_contents: List[ExtractedContent]) -> Dict:

        """Create comprehensive ontology from extracted content"""

        

        print("Creating ontology from extracted content...")

        

        # Step 1: Extract entities from all content

        all_chunks = []

        for content in extracted_contents:

            chunks = await self.extract_entities_from_content(content)

            all_chunks.extend(chunks)

        

        # Step 2: Extract relationships between entities

        await self.extract_relationships(all_chunks)

        

        # Step 3: Identify key concepts and themes

        await self.identify_concepts(all_chunks)

        

        # Step 4: Build knowledge graph

        await self.build_knowledge_graph()

        

        # Step 5: Calculate entity and concept importance

        await self.calculate_importance_scores()

        

        # Step 6: Create ontology structure

        ontology = await self.create_ontology_structure()

        

        # Step 7: Save ontology

        await self.save_ontology(ontology)

        

        print(f"Ontology created with {len(self.entities)} entities, {len(self.relationships)} relationships, and {len(self.concepts)} concepts")

        

        return ontology

    

    async def extract_entities_from_content(self, content: ExtractedContent) -> List[Dict]:

        """Extract named entities from content chunks"""

        

        # Split content into chunks for processing

        chunks = self.split_content_for_ner(content.cleaned_text)

        processed_chunks = []

        

        for i, chunk_text in enumerate(chunks):

            chunk_id = f"{hash(content.source_url)}_{i}"

            

            # Process with spaCy

            doc = self.nlp(chunk_text)

            

            chunk_entities = []

            for ent in doc.ents:

                # Filter entities by confidence and relevance

                if (len(ent.text.strip()) > 2 and 

                    ent.label_ in ['PERSON', 'ORG', 'GPE', 'PRODUCT', 'EVENT', 'WORK_OF_ART', 'LAW', 'LANGUAGE'] and

                    self.is_valid_entity(ent.text)):

                    

                    entity_data = {

                        'text': ent.text.strip(),

                        'label': ent.label_,

                        'start': ent.start_char,

                        'end': ent.end_char,

                        'confidence': self.calculate_entity_confidence(ent, doc)

                    }

                    chunk_entities.append(entity_data)

            

            # Limit entities per chunk to avoid noise

            chunk_entities = sorted(chunk_entities, key=lambda x: x['confidence'], reverse=True)[:self.max_entities_per_chunk]

            

            processed_chunk = {

                'chunk_id': chunk_id,

                'text': chunk_text,

                'entities': chunk_entities,

                'source_url': content.source_url,

                'source_title': content.title

            }

            processed_chunks.append(processed_chunk)

            

            # Update global entities dictionary

            for entity_data in chunk_entities:

                self.update_global_entity(entity_data, chunk_id, chunk_text)

        

        return processed_chunks

    

    def split_content_for_ner(self, text: str, max_chunk_size: int = 1000000) -> List[str]:

        """Split content into chunks suitable for NER processing"""

        

        # spaCy has limits on text length, so we split long texts

        if len(text) <= max_chunk_size:

            return [text]

        

        # Split on paragraph boundaries

        paragraphs = text.split('\n\n')

        chunks = []

        current_chunk = []

        current_length = 0

        

        for paragraph in paragraphs:

            para_length = len(paragraph)

            

            if current_length + para_length > max_chunk_size and current_chunk:

                chunks.append('\n\n'.join(current_chunk))

                current_chunk = [paragraph]

                current_length = para_length

            else:

                current_chunk.append(paragraph)

                current_length += para_length

        

        if current_chunk:

            chunks.append('\n\n'.join(current_chunk))

        

        return chunks

    

    def is_valid_entity(self, entity_text: str) -> bool:

        """Check if entity text is valid and meaningful"""

        

        entity_text = entity_text.strip()

        

        # Filter out common false positives

        if (len(entity_text) < 3 or 

            entity_text.isdigit() or

            entity_text.lower() in ['the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'] or

            entity_text.count(' ') > 5):  # Very long entities are often errors

            return False

        

        return True

    

    def calculate_entity_confidence(self, ent, doc) -> float:

        """Calculate confidence score for an entity"""

        

        base_confidence = 0.8  # Base confidence for spaCy entities

        

        # Adjust based on entity characteristics

        if ent.text.istitle():  # Proper capitalization

            base_confidence += 0.1

        

        if len(ent.text.split()) > 1:  # Multi-word entities often more reliable

            base_confidence += 0.05

        

        # Check if entity appears multiple times (higher confidence)

        entity_count = sum(1 for token in doc if token.text.lower() == ent.text.lower())

        if entity_count > 1:

            base_confidence += min(0.1, entity_count * 0.02)

        

        return min(1.0, base_confidence)

    

    def update_global_entity(self, entity_data: Dict, chunk_id: str, chunk_text: str):

        """Update global entities dictionary with new entity occurrence"""

        

        entity_text = entity_data['text'].lower()

        

        if entity_text not in self.entities:

            self.entities[entity_text] = Entity(

                text=entity_data['text'],

                label=entity_data['label'],

                confidence=entity_data['confidence'],

                mentions=[entity_data['text']],

                context_chunks=[chunk_id]

            )

        else:

            # Update existing entity

            existing_entity = self.entities[entity_text]

            existing_entity.mentions.append(entity_data['text'])

            existing_entity.context_chunks.append(chunk_id)

            

            # Update confidence (weighted average)

            total_mentions = len(existing_entity.mentions)

            existing_entity.confidence = ((existing_entity.confidence * (total_mentions - 1) + 

                                         entity_data['confidence']) / total_mentions)

    

    async def extract_relationships(self, processed_chunks: List[Dict]):

        """Extract relationships between entities"""

        

        print("Extracting relationships between entities...")

        

        for chunk in processed_chunks:

            chunk_text = chunk['text']

            entities = chunk['entities']

            

            # Extract relationships within each chunk

            for i, entity1 in enumerate(entities):

                for j, entity2 in enumerate(entities[i+1:], i+1):

                    relationship = self.identify_relationship(entity1, entity2, chunk_text)

                    if relationship:

                        relationship.source_chunk = chunk['chunk_id']

                        self.relationships.append(relationship)

    

    def identify_relationship(self, entity1: Dict, entity2: Dict, text: str) -> Optional[Relationship]:

        """Identify relationship between two entities in text"""

        

        # Simple pattern-based relationship extraction

        e1_text = entity1['text']

        e2_text = entity2['text']

        

        # Find sentences containing both entities

        sentences = text.split('.')

        for sentence in sentences:

            if e1_text in sentence and e2_text in sentence:

                # Look for relationship patterns

                sentence_lower = sentence.lower()

                

                relationship_patterns = {

                    'is_part_of': ['part of', 'component of', 'element of', 'belongs to'],

                    'related_to': ['related to', 'associated with', 'connected to', 'linked to'],

                    'used_by': ['used by', 'utilized by', 'employed by'],

                    'created_by': ['created by', 'developed by', 'made by', 'built by'],

                    'located_in': ['located in', 'situated in', 'found in', 'based in'],

                    'works_for': ['works for', 'employed by', 'member of']

                }

                

                for relation_type, patterns in relationship_patterns.items():

                    for pattern in patterns:

                        if pattern in sentence_lower:

                            # Determine direction based on entity positions

                            e1_pos = sentence.find(e1_text)

                            e2_pos = sentence.find(e2_text)

                            pattern_pos = sentence_lower.find(pattern)

                            

                            if e1_pos < pattern_pos < e2_pos:

                                source, target = e1_text, e2_text

                            elif e2_pos < pattern_pos < e1_pos:

                                source, target = e2_text, e1_text

                            else:

                                continue

                            

                            return Relationship(

                                source_entity=source,

                                target_entity=target,

                                relation_type=relation_type,

                                confidence=0.7,

                                evidence_text=sentence.strip(),

                                source_chunk=""

                            )

        

        # If no explicit relationship found, create a general co-occurrence relationship

        if abs(entity1['start'] - entity2['start']) < 200:  # Entities are close in text

            return Relationship(

                source_entity=e1_text,

                target_entity=e2_text,

                relation_type='co_occurs_with',

                confidence=0.4,

                evidence_text=text[max(0, min(entity1['start'], entity2['start'])-50):

                              max(entity1['end'], entity2['end'])+50],

                source_chunk=""

            )

        

        return None

    

    async def identify_concepts(self, processed_chunks: List[Dict]):

        """Identify key concepts and themes using clustering"""

        

        print("Identifying key concepts and themes...")

        

        # Prepare text data for clustering

        chunk_texts = [chunk['text'] for chunk in processed_chunks]

        

        # Use TF-IDF to vectorize text

        vectorizer = TfidfVectorizer(

            max_features=1000,

            stop_words='english',

            ngram_range=(1, 3),

            min_df=2,

            max_df=0.8

        )

        

        tfidf_matrix = vectorizer.fit_transform(chunk_texts)

        feature_names = vectorizer.get_feature_names_out()

        

        # Perform clustering

        n_clusters = min(self.concept_cluster_count, len(chunk_texts))

        if n_clusters > 1:

            kmeans = KMeans(n_clusters=n_clusters, random_state=42)

            cluster_labels = kmeans.fit_predict(tfidf_matrix)

            

            # Extract concepts from clusters

            for cluster_id in range(n_clusters):

                cluster_chunks = [chunk for i, chunk in enumerate(processed_chunks) if cluster_labels[i] == cluster_id]

                

                if cluster_chunks:

                    concept = self.create_concept_from_cluster(cluster_id, cluster_chunks, tfidf_matrix, feature_names, cluster_labels)

                    self.concepts[concept.name] = concept

    

    def create_concept_from_cluster(self, cluster_id: int, cluster_chunks: List[Dict], 

                                  tfidf_matrix, feature_names: List[str], cluster_labels: np.ndarray) -> Concept:

        """Create a concept from a cluster of chunks"""

        

        # Get cluster indices

        cluster_indices = [i for i, label in enumerate(cluster_labels) if label == cluster_id]

        

        # Calculate cluster centroid

        cluster_tfidf = tfidf_matrix[cluster_indices]

        centroid = np.mean(cluster_tfidf.toarray(), axis=0)

        

        # Get top keywords for this cluster

        top_indices = np.argsort(centroid)[-10:][::-1]

        top_keywords = [feature_names[i] for i in top_indices if centroid[i] > 0]

        

        # Extract entities from cluster chunks

        cluster_entities = set()

        chunk_ids = []

        for chunk in cluster_chunks:

            chunk_ids.append(chunk['chunk_id'])

            for entity in chunk['entities']:

                cluster_entities.add(entity['text'])

        

        # Create concept name from top keywords

        concept_name = f"concept_{cluster_id}_{top_keywords[0] if top_keywords else 'unknown'}"

        

        # Calculate importance score based on cluster size and keyword strength

        importance_score = len(cluster_chunks) / len(cluster_labels) * np.max(centroid)

        

        return Concept(

            name=concept_name,

            keywords=top_keywords,

            related_entities=list(cluster_entities),

            importance_score=importance_score,

            chunk_ids=chunk_ids

        )

    

    async def build_knowledge_graph(self):

        """Build NetworkX knowledge graph from entities and relationships"""

        

        print("Building knowledge graph...")

        

        # Add entity nodes

        for entity_text, entity in self.entities.items():

            self.knowledge_graph.add_node(

                entity_text,

                label=entity.label,

                confidence=entity.confidence,

                mention_count=len(entity.mentions),

                type='entity'

            )

        

        # Add concept nodes

        for concept_name, concept in self.concepts.items():

            self.knowledge_graph.add_node(

                concept_name,

                keywords=concept.keywords,

                importance=concept.importance_score,

                type='concept'

            )

            

            # Connect concepts to related entities

            for entity_text in concept.related_entities:

                if entity_text.lower() in self.entities:

                    self.knowledge_graph.add_edge(

                        concept_name,

                        entity_text.lower(),

                        relation_type='contains_entity',

                        weight=0.5

                    )

        

        # Add relationship edges

        for relationship in self.relationships:

            source = relationship.source_entity.lower()

            target = relationship.target_entity.lower()

            

            if source in self.entities and target in self.entities:

                self.knowledge_graph.add_edge(

                    source,

                    target,

                    relation_type=relationship.relation_type,

                    confidence=relationship.confidence,

                    evidence=relationship.evidence_text

                )

    

    async def calculate_importance_scores(self):

        """Calculate importance scores using graph centrality measures"""

        

        print("Calculating importance scores...")

        

        if len(self.knowledge_graph.nodes()) > 0:

            # Calculate various centrality measures

            pagerank_scores = nx.pagerank(self.knowledge_graph)

            betweenness_scores = nx.betweenness_centrality(self.knowledge_graph)

            degree_scores = dict(self.knowledge_graph.degree())

            

            # Update entity importance scores

            for entity_text, entity in self.entities.items():

                if entity_text in pagerank_scores:

                    # Combine different centrality measures

                    combined_score = (pagerank_scores[entity_text] * 0.4 +

                                    betweenness_scores[entity_text] * 0.3 +

                                    degree_scores[entity_text] / max(degree_scores.values()) * 0.3)

                    

                    # Also factor in mention frequency

                    mention_bonus = min(0.3, len(entity.mentions) / 10)

                    entity.confidence = min(1.0, entity.confidence + combined_score + mention_bonus)

    

    async def create_ontology_structure(self) -> Dict:

        """Create structured ontology representation"""

        

        # Sort entities by importance

        sorted_entities = sorted(self.entities.items(), 

                               key=lambda x: x[1].confidence, 

                               reverse=True)

        

        # Sort concepts by importance

        sorted_concepts = sorted(self.concepts.items(),

                               key=lambda x: x[1].importance_score,

                               reverse=True)

        

        ontology = {

            'entities': {

                entity_text: asdict(entity) for entity_text, entity in sorted_entities[:50]  # Top 50 entities

            },

            'concepts': {

                concept_name: asdict(concept) for concept_name, concept in sorted_concepts[:20]  # Top 20 concepts

            },

            'relationships': [asdict(rel) for rel in self.relationships if rel.confidence > 0.5],

            'graph_statistics': {

                'total_nodes': len(self.knowledge_graph.nodes()),

                'total_edges': len(self.knowledge_graph.edges()),

                'average_degree': sum(dict(self.knowledge_graph.degree()).values()) / len(self.knowledge_graph.nodes()) if len(self.knowledge_graph.nodes()) > 0 else 0

            }

        }

        

        return ontology

    

    async def get_related_concepts(self, entity_or_concept: str, max_results: int = 5) -> List[Tuple[str, float]]:

        """Get concepts related to a given entity or concept"""

        

        if entity_or_concept.lower() not in self.knowledge_graph:

            return []

        

        # Use graph traversal to find related nodes

        related_nodes = []

        

        # Direct neighbors

        for neighbor in self.knowledge_graph.neighbors(entity_or_concept.lower()):

            edge_data = self.knowledge_graph.get_edge_data(entity_or_concept.lower(), neighbor)

            weight = edge_data.get('confidence', 0.5) if edge_data else 0.5

            related_nodes.append((neighbor, weight))

        

        # Sort by weight and return top results

        related_nodes.sort(key=lambda x: x[1], reverse=True)

        return related_nodes[:max_results]

    

    async def save_ontology(self, ontology: Dict):

        """Save ontology and knowledge graph to disk"""

        

        # Save ontology JSON

        ontology_path = self.storage_path / "ontology.json"

        with open(ontology_path, 'w') as f:

            json.dump(ontology, f, indent=2)

        

        # Save knowledge graph

        graph_path = self.storage_path / "knowledge_graph.pickle"

        with open(graph_path, 'wb') as f:

            pickle.dump(self.knowledge_graph, f)

        

        print(f"Ontology saved to {self.storage_path}")


This GraphRAG implementation creates sophisticated knowledge representations that enable the system to understand relationships between concepts and generate more coherent, well-structured presentations. The ontology provides a foundation for intelligent content organization and narrative flow in the generated slides.


VISUAL CONTENT EXTRACTION WITH VLM

The Visual Language Model component enhances the presentation creation system by extracting and analyzing visual elements from the downloaded documents. This component identifies charts, diagrams, images, and figures that can enhance the presentation's visual appeal and information delivery effectiveness.

The VLM system processes both PDF and HTML documents to locate visual content, extracts relevant images and charts, analyzes their content to understand their purpose and relevance, and prepares them for inclusion in the generated presentations. The system also generates appropriate captions and descriptions for accessibility and context.

The visual processor implements intelligent filtering to ensure only high-quality, relevant visual content is selected for inclusion in presentations. It also handles image format conversion and sizing optimization to ensure compatibility with PowerPoint requirements.

Here's a detailed implementation of the visual content extraction system:

The following code example shows how to implement a sophisticated visual content extraction system using Vision Language Models to identify, extract, and analyze visual elements from documents. This implementation includes image quality assessment, relevance scoring, and content analysis for intelligent visual content selection.


import cv2

import numpy as np

from PIL import Image, ImageEnhance

import fitz  # PyMuPDF

from bs4 import BeautifulSoup

import requests

from urllib.parse import urljoin, urlparse

import base64

import io

from typing import List, Dict, Tuple, Optional

from dataclasses import dataclass

import asyncio

import aiohttp

import aiofiles

from pathlib import Path

import hashlib


@dataclass

class VisualElement:

    element_id: str

    source_url: str

    source_file: str

    element_type: str  # 'image', 'chart', 'diagram', 'table'

    local_path: str

    original_caption: str

    generated_caption: str

    relevance_score: float

    quality_score: float

    dimensions: Tuple[int, int]

    file_size: int

    format: str

    page_number: Optional[int] = None

    extraction_method: str = ""


class VisualProcessor:

    def __init__(self, config: Dict):

        self.config = config

        self.min_image_size = config.get('min_image_size', (100, 100))

        self.max_image_size = config.get('max_image_size', (2000, 2000))

        self.min_quality_score = config.get('min_quality_score', 0.5)

        self.supported_formats = config.get('supported_formats', ['png', 'jpg', 'jpeg', 'gif', 'bmp'])

        

        # Storage setup

        self.visual_storage_path = Path(config.get('visual_storage_path', './visual_content'))

        self.visual_storage_path.mkdir(exist_ok=True)

        

        # VLM setup (placeholder for actual VLM integration)

        self.vlm_enabled = config.get('vlm_enabled', False)

        

    async def extract_visuals(self, documents: List[DocumentMetadata]) -> List[VisualElement]:

        """Extract visual elements from all documents"""

        

        print("Extracting visual elements from documents...")

        

        all_visuals = []

        

        for doc in documents:

            if not doc.local_path or not Path(doc.local_path).exists():

                continue

            

            try:

                if doc.content_type == 'pdf':

                    visuals = await self.extract_pdf_visuals(doc)

                elif doc.content_type == 'html':

                    visuals = await self.extract_html_visuals(doc)

                else:

                    continue

                

                # Filter and validate visuals

                validated_visuals = await self.validate_and_filter_visuals(visuals)

                all_visuals.extend(validated_visuals)

                

            except Exception as e:

                print(f"Error extracting visuals from {doc.local_path}: {e}")

        

        # Sort by relevance and quality

        all_visuals.sort(key=lambda x: (x.relevance_score + x.quality_score) / 2, reverse=True)

        

        print(f"Extracted {len(all_visuals)} visual elements")

        return all_visuals

    

    async def extract_pdf_visuals(self, doc: DocumentMetadata) -> List[VisualElement]:

        """Extract visual elements from PDF documents"""

        

        visuals = []

        pdf_path = Path(doc.local_path)

        

        try:

            # Open PDF with PyMuPDF

            pdf_document = fitz.open(pdf_path)

            

            for page_num in range(len(pdf_document)):

                page = pdf_document.load_page(page_num)

                

                # Extract images from page

                image_list = page.get_images()

                

                for img_index, img in enumerate(image_list):

                    try:

                        # Get image data

                        xref = img[0]

                        pix = fitz.Pixmap(pdf_document, xref)

                        

                        # Skip if image is too small or in unsupported format

                        if pix.width < self.min_image_size[0] or pix.height < self.min_image_size[1]:

                            pix = None

                            continue

                        

                        # Convert to PIL Image

                        if pix.n - pix.alpha < 4:  # GRAY or RGB

                            img_data = pix.tobytes("png")

                            img_pil = Image.open(io.BytesIO(img_data))

                        else:  # CMYK

                            pix1 = fitz.Pixmap(fitz.csRGB, pix)

                            img_data = pix1.tobytes("png")

                            img_pil = Image.open(io.BytesIO(img_data))

                            pix1 = None

                        

                        pix = None

                        

                        # Save image and create visual element

                        visual_element = await self.create_visual_element_from_image(

                            img_pil, doc, page_num, img_index, 'pdf_extraction'

                        )

                        

                        if visual_element:

                            visuals.append(visual_element)

                            

                    except Exception as e:

                        print(f"Error extracting image {img_index} from page {page_num}: {e}")

                        continue

                

                # Extract vector graphics and charts (simplified approach)

                # This would require more sophisticated analysis in a real implementation

                drawings = page.get_drawings()

                if drawings:

                    # Create a rendered image of the page for chart detection

                    mat = fitz.Matrix(2, 2)  # 2x zoom

                    pix = page.get_pixmap(matrix=mat)

                    img_data = pix.tobytes("png")

                    page_image = Image.open(io.BytesIO(img_data))

                    

                    # Analyze for chart-like content

                    if await self.detect_chart_content(page_image):

                        visual_element = await self.create_visual_element_from_image(

                            page_image, doc, page_num, 0, 'chart_detection'

                        )

                        if visual_element:

                            visual_element.element_type = 'chart'

                            visuals.append(visual_element)

            

            pdf_document.close()

            

        except Exception as e:

            print(f"Error processing PDF {pdf_path}: {e}")

        

        return visuals

    

    async def extract_html_visuals(self, doc: DocumentMetadata) -> List[VisualElement]:

        """Extract visual elements from HTML documents"""

        

        visuals = []

        html_path = Path(doc.local_path)

        

        try:

            async with aiofiles.open(html_path, 'r', encoding='utf-8', errors='ignore') as f:

                html_content = await f.read()

            

            soup = BeautifulSoup(html_content, 'html.parser')

            

            # Find all image elements

            img_tags = soup.find_all('img')

            

            for img_index, img_tag in enumerate(img_tags):

                try:

                    img_src = img_tag.get('src')

                    if not img_src:

                        continue

                    

                    # Handle relative URLs

                    if not img_src.startswith('http'):

                        img_src = urljoin(doc.url, img_src)

                    

                    # Download and process image

                    img_pil = await self.download_image_from_url(img_src)

                    if img_pil:

                        # Get caption from alt text or nearby text

                        caption = img_tag.get('alt', '') or img_tag.get('title', '')

                        if not caption:

                            caption = self.extract_nearby_text(img_tag)

                        

                        visual_element = await self.create_visual_element_from_image(

                            img_pil, doc, None, img_index, 'html_extraction'

                        )

                        

                        if visual_element:

                            visual_element.original_caption = caption

                            visuals.append(visual_element)

                

                except Exception as e:

                    print(f"Error processing HTML image {img_index}: {e}")

                    continue

            

            # Look for SVG elements (charts/diagrams)

            svg_tags = soup.find_all('svg')

            for svg_index, svg_tag in enumerate(svg_tags):

                try:

                    # Convert SVG to image (simplified approach)

                    svg_content = str(svg_tag)

                    if len(svg_content) > 100:  # Skip very small SVGs

                        # In a real implementation, you would use a library like cairosvg

                        # to convert SVG to PNG

                        pass

                

                except Exception as e:

                    print(f"Error processing SVG {svg_index}: {e}")

                    continue

        

        except Exception as e:

            print(f"Error processing HTML {html_path}: {e}")

        

        return visuals

    

    async def download_image_from_url(self, url: str) -> Optional[Image.Image]:

        """Download image from URL and return PIL Image"""

        

        try:

            timeout = aiohttp.ClientTimeout(total=10)

            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'}

            

            async with aiohttp.ClientSession(timeout=timeout) as session:

                async with session.get(url, headers=headers) as response:

                    if response.status == 200:

                        image_data = await response.read()

                        return Image.open(io.BytesIO(image_data))

        

        except Exception as e:

            print(f"Error downloading image from {url}: {e}")

        

        return None

    

    def extract_nearby_text(self, img_tag) -> str:

        """Extract text near an image tag for caption generation"""

        

        # Look for captions in nearby elements

        parent = img_tag.parent

        if parent:

            # Check for figure caption

            figcaption = parent.find('figcaption')

            if figcaption:

                return figcaption.get_text().strip()

            

            # Check for nearby text

            siblings = parent.find_all(text=True)

            nearby_text = ' '.join([text.strip() for text in siblings if text.strip()])

            if len(nearby_text) > 10:

                return nearby_text[:200]  # Limit length

        

        return ""

    

    async def create_visual_element_from_image(self, img_pil: Image.Image, doc: DocumentMetadata, 

                                             page_num: Optional[int], img_index: int, 

                                             extraction_method: str) -> Optional[VisualElement]:

        """Create VisualElement from PIL Image"""

        

        try:

            # Generate unique ID

            element_id = hashlib.md5(f"{doc.url}_{page_num}_{img_index}".encode()).hexdigest()

            

            # Check image dimensions and quality

            width, height = img_pil.size

            if width < self.min_image_size[0] or height < self.min_image_size[1]:

                return None

            

            # Calculate quality score

            quality_score = await self.calculate_image_quality(img_pil)

            if quality_score < self.min_quality_score:

                return None

            

            # Optimize image for presentation use

            optimized_img = await self.optimize_image_for_presentation(img_pil)

            

            # Save image

            filename = f"{element_id}.png"

            local_path = self.visual_storage_path / filename

            optimized_img.save(local_path, 'PNG', optimize=True)

            

            # Determine element type

            element_type = await self.classify_visual_element(optimized_img)

            

            # Calculate relevance score (placeholder - would use VLM in real implementation)

            relevance_score = await self.calculate_relevance_score(optimized_img, doc.title)

            

            # Generate caption using VLM (placeholder)

            generated_caption = await self.generate_image_caption(optimized_img)

            

            return VisualElement(

                element_id=element_id,

                source_url=doc.url,

                source_file=doc.local_path,

                element_type=element_type,

                local_path=str(local_path),

                original_caption="",

                generated_caption=generated_caption,

                relevance_score=relevance_score,

                quality_score=quality_score,

                dimensions=(optimized_img.width, optimized_img.height),

                file_size=local_path.stat().st_size,

                format='png',

                page_number=page_num,

                extraction_method=extraction_method

            )

        

        except Exception as e:

            print(f"Error creating visual element: {e}")

            return None

    

    async def calculate_image_quality(self, img: Image.Image) -> float:

        """Calculate image quality score based on various factors"""

        

        # Convert to numpy array for analysis

        img_array = np.array(img.convert('RGB'))

        

        quality_score = 0.0

        

        # Check resolution

        width, height = img.size

        pixel_count = width * height

        resolution_score = min(1.0, pixel_count / (500 * 500))  # Normalize to 500x500

        quality_score += resolution_score * 0.3

        

        # Check for blur (using Laplacian variance)

        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)

        blur_score = cv2.Laplacian(gray, cv2.CV_64F).var()

        normalized_blur = min(1.0, blur_score / 1000)  # Normalize

        quality_score += normalized_blur * 0.3

        

        # Check contrast

        contrast = np.std(gray)

        normalized_contrast = min(1.0, contrast / 64)  # Normalize

        quality_score += normalized_contrast * 0.2

        

        # Check for mostly white/empty images

        mean_brightness = np.mean(gray)

        if mean_brightness > 240:  # Very bright (likely empty)

            quality_score *= 0.5

        

        # Check aspect ratio (prefer reasonable ratios)

        aspect_ratio = max(width, height) / min(width, height)

        if aspect_ratio > 5:  # Very wide or tall images

            quality_score *= 0.7

        

        quality_score += 0.2  # Base score

        

        return min(1.0, quality_score)

    

    async def optimize_image_for_presentation(self, img: Image.Image) -> Image.Image:

        """Optimize image for presentation use"""

        

        # Convert to RGB if necessary

        if img.mode != 'RGB':

            img = img.convert('RGB')

        

        # Resize if too large

        width, height = img.size

        if width > self.max_image_size[0] or height > self.max_image_size[1]:

            img.thumbnail(self.max_image_size, Image.Resampling.LANCZOS)

        

        # Enhance contrast and sharpness slightly

        enhancer = ImageEnhance.Contrast(img)

        img = enhancer.enhance(1.1)

        

        enhancer = ImageEnhance.Sharpness(img)

        img = enhancer.enhance(1.05)

        

        return img

    

    async def detect_chart_content(self, img: Image.Image) -> bool:

        """Detect if image contains chart or diagram content"""

        

        # Convert to numpy array

        img_array = np.array(img.convert('RGB'))

        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)

        

        # Look for geometric shapes (lines, rectangles) typical in charts

        edges = cv2.Canny(gray, 50, 150)

        lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=50, minLineLength=30, maxLineGap=10)

        

        if lines is not None and len(lines) > 10:

            # Many lines suggest structured content like charts

            return True

        

        # Look for text (charts often have labels)

        # This is a simplified check - real implementation would use OCR

        text_regions = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)[0]

        if len(text_regions) > 5:

            return True

        

        return False

    

    async def classify_visual_element(self, img: Image.Image) -> str:

        """Classify the type of visual element"""

        

        # This is a placeholder implementation

        # Real implementation would use a trained classifier or VLM

        

        if await self.detect_chart_content(img):

            return 'chart'

        

        # Check for table-like structure

        img_array = np.array(img.convert('RGB'))

        gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)

        

        # Look for grid patterns

        edges = cv2.Canny(gray, 50, 150)

        horizontal_lines = cv2.HoughLinesP(edges, 1, np.pi/180, threshold=30, minLineLength=img.width//4, maxLineGap=5)

        vertical_lines = cv2.HoughLinesP(edges, 1, 0, threshold=30, minLineLength=img.height//4, maxLineGap=5)

        

        if (horizontal_lines is not None and len(horizontal_lines) > 3 and

            vertical_lines is not None and len(vertical_lines) > 3):

            return 'table'

        

        # Default to image

        return 'image'

    

    async def calculate_relevance_score(self, img: Image.Image, topic: str) -> float:

        """Calculate relevance score of image to presentation topic"""

        

        # Placeholder implementation

        # Real implementation would use VLM to analyze image content and compare to topic

        

        base_score = 0.5

        

        # For now, return base score with some random variation

        # In real implementation, this would analyze image content

        return min(1.0, base_score + np.random.random() * 0.3)

    

    async def generate_image_caption(self, img: Image.Image) -> str:

        """Generate descriptive caption for image using VLM"""

        

        # Placeholder implementation

        # Real implementation would use a Vision Language Model

        

        if self.vlm_enabled:

            # This would call an actual VLM API

            return "AI-generated caption describing the visual content"

        else:

            return "Visual element extracted from source document"

    

    async def validate_and_filter_visuals(self, visuals: List[VisualElement]) -> List[VisualElement]:

        """Validate and filter visual elements based on quality and relevance"""

        

        validated_visuals = []

        

        for visual in visuals:

            # Check quality threshold

            if visual.quality_score < self.min_quality_score:

                continue

            

            # Check file exists and is readable

            if not Path(visual.local_path).exists():

                continue

            

            # Check file size (avoid very large files)

            if visual.file_size > 10 * 1024 * 1024:  # 10MB limit

                continue

            

            validated_visuals.append(visual)

        

        return validated_visuals


This visual content extraction system provides comprehensive capabilities for identifying, extracting, and analyzing visual elements from documents. The system ensures that only high-quality, relevant visual content is selected for inclusion in presentations, enhancing the overall quality and effectiveness of the generated slides.


POWERPOINT GENERATION ENGINE

The PowerPoint Generation Engine serves as the culmination of all previous processing steps, transforming the extracted content, structured knowledge, and visual elements into professional presentation slides. This component handles the complex task of organizing information logically, applying consistent design themes, and creating slides that follow UX best practices.

The generation engine implements sophisticated algorithms for content organization, determining optimal slide structures, balancing text and visual content, and ensuring consistent formatting throughout the presentation. It also handles the technical aspects of PowerPoint file creation, including proper XML structure, theme application, and multimedia integration.

The system creates presentations that not only contain relevant information but also follow professional presentation standards with appropriate slide transitions, consistent typography, and effective use of white space. Each slide includes comprehensive notes sections that provide additional context and speaking points.

Here's a comprehensive implementation of the PowerPoint generation system:

The following code example demonstrates how to implement a sophisticated PowerPoint generation engine that creates professional presentations from processed content. This implementation includes slide structure optimization, theme application, content organization, and comprehensive notes generation.


from pptx import Presentation

from pptx.util import Inches, Pt

from pptx.enum.text import PP_ALIGN, MSO_ANCHOR

from pptx.enum.shapes import MSO_SHAPE

from pptx.dml.color import RGBColor

from pptx.enum.dml import MSO_THEME_COLOR

import json

from typing import List, Dict, Tuple, Optional

from dataclasses import dataclass

from pathlib import Path

import re

import asyncio


@dataclass

class SlideContent:

    title: str

    content_points: List[str]

    visual_elements: List[VisualElement]

    notes: str

    slide_type: str  # 'title', 'content', 'visual', 'conclusion'

    importance_score: float


@dataclass

class PresentationStructure:

    title: str

    subtitle: str

    slides: List[SlideContent]

    theme_name: str

    total_estimated_duration: int  # minutes


class PresentationGenerator:

    def __init__(self, config: Dict):

        self.config = config

        self.max_slides = config.get('max_slides', 15)

        self.max_points_per_slide = config.get('max_points_per_slide', 5)

        self.min_words_per_point = config.get('min_words_per_point', 3)

        self.max_words_per_point = config.get('max_words_per_point', 20)

        

        # Theme configurations

        self.themes = self.load_theme_configurations()

        self.default_theme = config.get('default_theme', 'professional_blue')

        

        # UX guidelines

        self.ux_guidelines = {

            'max_text_per_slide': 50,  # words

            'min_font_size': 18,

            'title_font_size': 32,

            'max_bullet_levels': 2,

            'visual_text_ratio': 0.6  # 60% visual, 40% text for visual slides

        }

    

    async def create_presentation(self, presentation_structure: PresentationStructure, 

                                visual_elements: List[VisualElement], 

                                theme_name: Optional[str], 

                                output_folder: str) -> str:

        """Create PowerPoint presentation from structured content"""

        

        print(f"Creating presentation: {presentation_structure.title}")

        

        # Initialize presentation with theme

        prs = self.initialize_presentation_with_theme(theme_name or self.default_theme)

        

        # Create title slide

        await self.create_title_slide(prs, presentation_structure)

        

        # Create content slides

        for slide_content in presentation_structure.slides:

            await self.create_content_slide(prs, slide_content, visual_elements)

        

        # Create conclusion slide

        await self.create_conclusion_slide(prs, presentation_structure)

        

        # Apply final formatting and validation

        await self.apply_final_formatting(prs)

        

        # Save presentation

        output_path = await self.save_presentation(prs, presentation_structure.title, output_folder)

        

        print(f"Presentation created successfully: {output_path}")

        return output_path

    

    def initialize_presentation_with_theme(self, theme_name: str) -> Presentation:

        """Initialize presentation with specified theme"""

        

        prs = Presentation()

        

        # Apply theme settings

        theme_config = self.themes.get(theme_name, self.themes[self.default_theme])

        

        # Set slide master properties

        slide_master = prs.slide_master

        

        # Apply theme colors and fonts

        self.apply_theme_to_master(slide_master, theme_config)

        

        return prs

    

    def apply_theme_to_master(self, slide_master, theme_config: Dict):

        """Apply theme configuration to slide master"""

        

        # This is a simplified implementation

        # Real implementation would modify the slide master XML

        pass

    

    async def create_title_slide(self, prs: Presentation, structure: PresentationStructure):

        """Create title slide"""

        

        # Use title slide layout

        title_slide_layout = prs.slide_layouts[0]  # Title slide layout

        slide = prs.slides.add_slide(title_slide_layout)

        

        # Set title and subtitle

        title_shape = slide.shapes.title

        subtitle_shape = slide.placeholders[1]

        

        title_shape.text = structure.title

        subtitle_shape.text = structure.subtitle

        

        # Apply title slide formatting

        self.format_title_slide(title_shape, subtitle_shape)

        

        # Add notes

        notes_slide = slide.notes_slide

        notes_text_frame = notes_slide.notes_text_frame

        notes_text_frame.text = self.generate_title_slide_notes(structure)

    

    def format_title_slide(self, title_shape, subtitle_shape):

        """Apply formatting to title slide elements"""

        

        # Title formatting

        title_paragraph = title_shape.text_frame.paragraphs[0]

        title_paragraph.font.size = Pt(self.ux_guidelines['title_font_size'])

        title_paragraph.font.bold = True

        title_paragraph.alignment = PP_ALIGN.CENTER

        

        # Subtitle formatting

        subtitle_paragraph = subtitle_shape.text_frame.paragraphs[0]

        subtitle_paragraph.font.size = Pt(24)

        subtitle_paragraph.alignment = PP_ALIGN.CENTER

    

    async def create_content_slide(self, prs: Presentation, slide_content: SlideContent, 

                                 visual_elements: List[VisualElement]):

        """Create content slide based on slide type"""

        

        if slide_content.slide_type == 'visual':

            await self.create_visual_slide(prs, slide_content, visual_elements)

        else:

            await self.create_text_slide(prs, slide_content, visual_elements)

    

    async def create_text_slide(self, prs: Presentation, slide_content: SlideContent, 

                              visual_elements: List[VisualElement]):

        """Create text-based content slide"""

        

        # Choose appropriate layout

        if slide_content.visual_elements:

            slide_layout = prs.slide_layouts[8]  # Content with caption layout

        else:

            slide_layout = prs.slide_layouts[1]  # Title and content layout

        

        slide = prs.slides.add_slide(slide_layout)

        

        # Set title

        title_shape = slide.shapes.title

        title_shape.text = slide_content.title

        self.format_slide_title(title_shape)

        

        # Add content

        if len(slide.placeholders) > 1:

            content_placeholder = slide.placeholders[1]

            self.populate_content_placeholder(content_placeholder, slide_content.content_points)

        

        # Add visual elements if present

        if slide_content.visual_elements:

            await self.add_visual_elements_to_slide(slide, slide_content.visual_elements[:1])  # Max 1 per text slide

        

        # Add notes

        self.add_slide_notes(slide, slide_content.notes)

    

    async def create_visual_slide(self, prs: Presentation, slide_content: SlideContent, 

                                visual_elements: List[VisualElement]):

        """Create visual-focused slide"""

        

        slide_layout = prs.slide_layouts[6]  # Blank layout for custom arrangement

        slide = prs.slides.add_slide(slide_layout)

        

        # Add title

        title_shape = slide.shapes.add_textbox(Inches(0.5), Inches(0.2), Inches(9), Inches(1))

        title_frame = title_shape.text_frame

        title_frame.text = slide_content.title

        self.format_slide_title(title_shape)

        

        # Add visual elements

        visual_top = Inches(1.5)

        if slide_content.visual_elements:

            await self.add_visual_elements_to_slide(slide, slide_content.visual_elements[:2], visual_top)

        

        # Add minimal text content

        if slide_content.content_points:

            text_points = slide_content.content_points[:3]  # Limit text on visual slides

            text_shape = slide.shapes.add_textbox(Inches(0.5), Inches(6.5), Inches(9), Inches(1.5))

            self.populate_content_placeholder(text_shape, text_points)

        

        # Add notes

        self.add_slide_notes(slide, slide_content.notes)

    

    def format_slide_title(self, title_shape):

        """Format slide title"""

        

        title_paragraph = title_shape.text_frame.paragraphs[0]

        title_paragraph.font.size = Pt(28)

        title_paragraph.font.bold = True

        title_paragraph.alignment = PP_ALIGN.LEFT

    

    def populate_content_placeholder(self, placeholder, content_points: List[str]):

        """Populate content placeholder with bullet points"""

        

        text_frame = placeholder.text_frame

        text_frame.clear()

        

        for i, point in enumerate(content_points[:self.max_points_per_slide]):

            if i == 0:

                p = text_frame.paragraphs[0]

            else:

                p = text_frame.add_paragraph()

            

            # Clean and format point

            clean_point = self.clean_bullet_point(point)

            p.text = clean_point

            p.level = 0

            p.font.size = Pt(self.ux_guidelines['min_font_size'])

            

            # Add sub-points if the point is complex

            sub_points = self.extract_sub_points(point)

            for sub_point in sub_points[:2]:  # Max 2 sub-points

                sub_p = text_frame.add_paragraph()

                sub_p.text = self.clean_bullet_point(sub_point)

                sub_p.level = 1

                sub_p.font.size = Pt(16)

    

    def clean_bullet_point(self, point: str) -> str:

        """Clean and format bullet point text"""

        

        # Remove excessive whitespace

        point = re.sub(r'\s+', ' ', point.strip())

        

        # Ensure proper sentence structure

        if not point.endswith(('.', '!', '?', ':')):

            point += '.'

        

        # Capitalize first letter

        if point:

            point = point[0].upper() + point[1:]

        

        # Limit length

        words = point.split()

        if len(words) > self.max_words_per_point:

            point = ' '.join(words[:self.max_words_per_point]) + '...'

        

        return point

    

    def extract_sub_points(self, main_point: str) -> List[str]:

        """Extract sub-points from a complex main point"""

        

        # Look for common sub-point indicators

        sub_point_patterns = [

            r'including:?\s*(.+)',

            r'such as:?\s*(.+)',

            r'for example:?\s*(.+)',

            r'specifically:?\s*(.+)'

        ]

        

        sub_points = []

        for pattern in sub_point_patterns:

            match = re.search(pattern, main_point, re.IGNORECASE)

            if match:

                sub_text = match.group(1)

                # Split on common delimiters

                parts = re.split(r'[,;]', sub_text)

                sub_points.extend([part.strip() for part in parts if len(part.strip()) > 5])

                break

        

        return sub_points[:2]  # Limit sub-points

    

    async def add_visual_elements_to_slide(self, slide, visual_elements: List[VisualElement], 

                                         top_position: Inches = Inches(1.5)):

        """Add visual elements to slide"""

        

        if not visual_elements:

            return

        

        # Calculate positioning for visual elements

        slide_width = Inches(10)

        slide_height = Inches(7.5)

        available_width = slide_width - Inches(1)  # Margins

        available_height = slide_height - top_position - Inches(0.5)

        

        if len(visual_elements) == 1:

            # Single visual element - center it

            visual = visual_elements[0]

            await self.add_single_visual_element(slide, visual, Inches(0.5), top_position, 

                                               available_width, available_height)

        else:

            # Multiple visual elements - arrange side by side

            element_width = available_width / len(visual_elements) - Inches(0.2)

            

            for i, visual in enumerate(visual_elements):

                left_position = Inches(0.5) + i * (element_width + Inches(0.2))

                await self.add_single_visual_element(slide, visual, left_position, top_position,

                                                   element_width, available_height)

    

    async def add_single_visual_element(self, slide, visual: VisualElement, 

                                      left: Inches, top: Inches, 

                                      max_width: Inches, max_height: Inches):

        """Add single visual element to slide"""

        

        try:

            visual_path = Path(visual.local_path)

            if not visual_path.exists():

                return

            

            # Calculate optimal size maintaining aspect ratio

            original_width, original_height = visual.dimensions

            aspect_ratio = original_width / original_height

            

            # Determine final size

            if aspect_ratio > 1:  # Wider than tall

                final_width = min(max_width, Inches(original_width / 100))

                final_height = final_width / aspect_ratio

            else:  # Taller than wide

                final_height = min(max_height, Inches(original_height / 100))

                final_width = final_height * aspect_ratio

            

            # Ensure it fits within constraints

            if final_width > max_width:

                final_width = max_width

                final_height = final_width / aspect_ratio

            if final_height > max_height:

                final_height = max_height

                final_width = final_height * aspect_ratio

            

            # Add image to slide

            picture = slide.shapes.add_picture(str(visual_path), left, top, final_width, final_height)

            

            # Add caption if available

            if visual.generated_caption or visual.original_caption:

                caption_text = visual.generated_caption or visual.original_caption

                caption_top = top + final_height + Inches(0.1)

                caption_shape = slide.shapes.add_textbox(left, caption_top, final_width, Inches(0.5))

                caption_frame = caption_shape.text_frame

                caption_frame.text = caption_text[:100]  # Limit caption length

                

                # Format caption

                caption_paragraph = caption_frame.paragraphs[0]

                caption_paragraph.font.size = Pt(12)

                caption_paragraph.font.italic = True

                caption_paragraph.alignment = PP_ALIGN.CENTER

        

        except Exception as e:

            print(f"Error adding visual element {visual.element_id}: {e}")

    

    async def create_conclusion_slide(self, prs: Presentation, structure: PresentationStructure):

        """Create conclusion slide"""

        

        slide_layout = prs.slide_layouts[1]  # Title and content layout

        slide = prs.slides.add_slide(slide_layout)

        

        # Set title

        title_shape = slide.shapes.title

        title_shape.text = "Conclusion"

        self.format_slide_title(title_shape)

        

        # Generate conclusion content

        conclusion_points = self.generate_conclusion_points(structure)

        

        # Add content

        content_placeholder = slide.placeholders[1]

        self.populate_content_placeholder(content_placeholder, conclusion_points)

        

        # Add notes

        conclusion_notes = self.generate_conclusion_notes(structure)

        self.add_slide_notes(slide, conclusion_notes)

    

    def generate_conclusion_points(self, structure: PresentationStructure) -> List[str]:

        """Generate conclusion points from presentation structure"""

        

        # Extract key themes from slides

        key_themes = []

        for slide in structure.slides:

            if slide.importance_score > 0.7:  # High importance slides

                key_themes.append(slide.title)

        

        conclusion_points = [

            f"We explored {structure.title} covering key aspects",

            f"Main topics included: {', '.join(key_themes[:3])}",

            "These insights provide a foundation for further exploration",

            "Thank you for your attention"

        ]

        

        return conclusion_points

    

    def add_slide_notes(self, slide, notes_text: str):

        """Add notes to slide"""

        

        notes_slide = slide.notes_slide

        notes_text_frame = notes_slide.notes_text_frame

        

        # Clean and format notes

        clean_notes = self.format_notes_text(notes_text)

        notes_text_frame.text = clean_notes

    

    def format_notes_text(self, notes_text: str) -> str:

        """Format notes text for speaker notes"""

        

        if not notes_text:

            return "No additional notes for this slide."

        

        # Clean up text

        notes_text = re.sub(r'\s+', ' ', notes_text.strip())

        

        # Add structure

        if not notes_text.startswith("Speaker Notes:"):

            notes_text = f"Speaker Notes:\n\n{notes_text}"

        

        # Add speaking tips

        notes_text += "\n\nSpeaking Tips:\n- Maintain eye contact with audience\n- Allow time for questions\n- Use gestures to emphasize key points"

        

        return notes_text

    

    def generate_title_slide_notes(self, structure: PresentationStructure) -> str:

        """Generate notes for title slide"""

        

        notes = f"""Speaker Notes for Title Slide:


Welcome the audience and introduce the topic: {structure.title}


This presentation will cover:

- Overview of the subject matter

- Key concepts and insights

- Practical applications and implications


Estimated duration: {structure.total_estimated_duration} minutes


Speaking Tips:

- Start with a compelling hook or question

- Briefly outline what the audience will learn

- Set expectations for interaction and questions"""

        

        return notes

    

    def generate_conclusion_notes(self, structure: PresentationStructure) -> str:

        """Generate notes for conclusion slide"""

        

        notes = f"""Speaker Notes for Conclusion:


Summarize the key points covered in this presentation about {structure.title}


Recap the main themes:

- Reinforce the most important concepts

- Highlight practical applications

- Connect back to the opening objectives


Next steps:

- Encourage questions and discussion

- Provide additional resources if available

- Thank the audience for their attention


Speaking Tips:

- End on a strong, memorable note

- Allow ample time for Q&A

- Be prepared to elaborate on any topic covered"""

        

        return notes

    

    async def apply_final_formatting(self, prs: Presentation):

        """Apply final formatting and validation to presentation"""

        

        # Validate slide count

        if len(prs.slides) > self.max_slides:

            print(f"Warning: Presentation has {len(prs.slides)} slides, which exceeds recommended maximum of {self.max_slides}")

        

        # Apply consistent formatting across all slides

        for slide in prs.slides:

            self.validate_slide_content(slide)

    

    def validate_slide_content(self, slide):

        """Validate individual slide content"""

        

        # Check for overly long text

        for shape in slide.shapes:

            if hasattr(shape, 'text_frame') and shape.text_frame:

                text = shape.text_frame.text

                word_count = len(text.split())

                if word_count > self.ux_guidelines['max_text_per_slide']:

                    print(f"Warning: Slide has {word_count} words, exceeding recommended maximum")

    

    async def save_presentation(self, prs: Presentation, title: str, output_folder: str) -> str:

        """Save presentation to file"""

        

        # Create output folder if it doesn't exist

        output_path = Path(output_folder)

        output_path.mkdir(exist_ok=True)

        

        # Generate safe filename

        safe_title = re.sub(r'[<>:"/\\|?*]', '_', title)

        safe_title = re.sub(r'\s+', '_', safe_title)

        filename = f"{safe_title}.pptx"

        

        # Ensure unique filename

        full_path = output_path / filename

        counter = 1

        while full_path.exists():

            name_part = safe_title

            full_path = output_path / f"{name_part}_{counter}.pptx"

            counter += 1

        

        # Save presentation

        prs.save(str(full_path))

        

        return str(full_path)

    

    def load_theme_configurations(self) -> Dict:

        """Load theme configurations"""

        

        themes = {

            'professional_blue': {

                'primary_color': RGBColor(0, 51, 102),

                'secondary_color': RGBColor(255, 255, 255),

                'accent_color': RGBColor(0, 102, 204),

                'font_family': 'Calibri',

                'background_style': 'solid'

            },

            'modern_gray': {

                'primary_color': RGBColor(64, 64, 64),

                'secondary_color': RGBColor(255, 255, 255),

                'accent_color': RGBColor(128, 128, 128),

                'font_family': 'Arial',

                'background_style': 'gradient'

            },

            'corporate_green': {

                'primary_color': RGBColor(0, 102, 51),

                'secondary_color': RGBColor(255, 255, 255),

                'accent_color': RGBColor(51, 153, 102),

                'font_family': 'Calibri',

                'background_style': 'solid'

            }

        }

        

        return themes


This PowerPoint generation engine creates professional presentations that follow UX best practices, maintain consistent formatting, and include comprehensive speaker notes. The system handles complex content organization while ensuring visual appeal and readability.


COMPLETE WORKING EXAMPLE

Now I'll provide a complete working example that demonstrates how all the components work together to create a functional Agentic AI system for PowerPoint generation. This example includes the main orchestration logic and shows how to use the system from start to finish.

The following complete example demonstrates how to integrate all the components into a working Agentic AI system. This implementation shows the full workflow from user input to final presentation generation, including configuration management, error handling, and user interaction capabilities.


import asyncio

import logging

import json

from pathlib import Path

from datetime import datetime

from typing import Dict, List, Optional

import argparse


# Configure logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class AgenticPowerPointSystem:

    """Complete Agentic AI system for PowerPoint creation"""

    

    def __init__(self, config_file: str = "config.json"):

        self.config = self.load_configuration(config_file)

        self.initialize_components()

        

    def load_configuration(self, config_file: str) -> Dict:

        """Load system configuration"""

        

        default_config = {

            "web_search": {

                "max_documents": 15,

                "search_engines": ["google", "bing"],

                "timeout_seconds": 30

            },

            "content_processing": {

                "min_content_length": 500,

                "max_content_length": 50000,

                "chunk_size": 512,

                "chunk_overlap": 50

            },

            "rag_system": {

                "embedding_model": "all-MiniLM-L6-v2",

                "max_chunks_per_query": 10,

                "rag_storage_path": "./rag_storage"

            },

            "graph_rag": {

                "min_entity_confidence": 0.7,

                "concept_cluster_count": 10,

                "graphrag_storage_path": "./graphrag_storage"

            },

            "visual_processing": {

                "min_image_size": [100, 100],

                "max_image_size": [1920, 1080],

                "min_quality_score": 0.5,

                "visual_storage_path": "./visual_content",

                "vlm_enabled": False

            },

            "presentation": {

                "max_slides": 12,

                "max_points_per_slide": 5,

                "default_theme": "professional_blue",

                "output_folder": "./presentations"

            },

            "llm": {

                "provider": "openai",  # or "local"

                "model": "gpt-3.5-turbo",

                "api_key": "",

                "local_model_path": ""

            }

        }

        

        config_path = Path(config_file)

        if config_path.exists():

            try:

                with open(config_path, 'r') as f:

                    user_config = json.load(f)

                # Merge configurations

                self.merge_configs(default_config, user_config)

            except Exception as e:

                logger.warning(f"Error loading config file: {e}. Using default configuration.")

        else:

            logger.info("Config file not found. Creating default configuration.")

            with open(config_path, 'w') as f:

                json.dump(default_config, f, indent=2)

        

        return default_config

    

    def merge_configs(self, default: Dict, user: Dict):

        """Recursively merge user config into default config"""

        for key, value in user.items():

            if key in default:

                if isinstance(default[key], dict) and isinstance(value, dict):

                    self.merge_configs(default[key], value)

                else:

                    default[key] = value

    

    def initialize_components(self):

        """Initialize all system components"""

        

        logger.info("Initializing Agentic PowerPoint System components...")

        

        # Initialize components with configuration

        self.web_searcher = WebSearchAgent(self.config["web_search"])

        self.content_processor = ContentProcessor(self.config["content_processing"])

        self.rag_system = RAGSystem(self.config["rag_system"])

        self.graph_rag = GraphRAGSystem(self.config["graph_rag"])

        self.visual_processor = VisualProcessor(self.config["visual_processing"])

        self.presentation_generator = PresentationGenerator(self.config["presentation"])

        self.llm_interface = LLMInterface(self.config["llm"])

        

        # Create necessary directories

        self.create_directories()

        

        logger.info("All components initialized successfully")

    

    def create_directories(self):

        """Create necessary directories for the system"""

        

        directories = [

            self.config["rag_system"]["rag_storage_path"],

            self.config["graph_rag"]["graphrag_storage_path"],

            self.config["visual_processing"]["visual_storage_path"],

            self.config["presentation"]["output_folder"],

            "./downloaded_documents"

        ]

        

        for directory in directories:

            Path(directory).mkdir(exist_ok=True)

    

    async def create_presentation_from_topic(self, topic: str, theme: Optional[str] = None, 

                                           max_slides: Optional[int] = None) -> str:

        """Main method to create presentation from topic"""

        

        logger.info(f"Starting presentation creation for topic: {topic}")

        

        try:

            # Step 1: Search and download documents

            logger.info("Step 1: Searching and downloading relevant documents...")

            documents = await self.web_searcher.search_and_download(

                topic, 

                max_documents=self.config["web_search"]["max_documents"]

            )

            

            if not documents:

                raise Exception("No relevant documents found for the topic")

            

            logger.info(f"Downloaded {len(documents)} documents")

            

            # Step 2: Extract and process content

            logger.info("Step 2: Extracting and processing content...")

            processed_content = await self.content_processor.process_documents(documents)

            

            if not processed_content:

                raise Exception("No content could be extracted from downloaded documents")

            

            logger.info(f"Processed {len(processed_content)} content pieces")

            

            # Step 3: Build RAG system

            logger.info("Step 3: Building RAG system...")

            await self.rag_system.index_content(processed_content)

            

            # Step 4: Create knowledge graph and ontology

            logger.info("Step 4: Creating knowledge graph and ontology...")

            ontology = await self.graph_rag.create_ontology(processed_content)

            

            # Step 5: Extract visual elements

            logger.info("Step 5: Extracting visual elements...")

            visual_elements = await self.visual_processor.extract_visuals(documents)

            logger.info(f"Extracted {len(visual_elements)} visual elements")

            

            # Step 6: Generate presentation structure

            logger.info("Step 6: Generating presentation structure...")

            presentation_structure = await self.generate_presentation_structure(

                topic, processed_content, ontology, max_slides or self.config["presentation"]["max_slides"]

            )

            

            # Step 7: Create PowerPoint file

            logger.info("Step 7: Creating PowerPoint presentation...")

            output_path = await self.presentation_generator.create_presentation(

                presentation_structure,

                visual_elements,

                theme or self.config["presentation"]["default_theme"],

                self.config["presentation"]["output_folder"]

            )

            

            logger.info(f"Presentation created successfully: {output_path}")

            return output_path

            

        except Exception as e:

            logger.error(f"Error creating presentation: {e}")

            raise

    

    async def generate_presentation_structure(self, topic: str, processed_content: List[ExtractedContent], 

                                            ontology: Dict, max_slides: int) -> PresentationStructure:

        """Generate presentation structure using LLM and processed content"""

        

        # Extract key concepts and entities from ontology

        key_concepts = list(ontology.get("concepts", {}).keys())[:10]

        key_entities = list(ontology.get("entities", {}).keys())[:15]

        

        # Generate presentation outline using LLM

        outline_prompt = self.create_outline_prompt(topic, key_concepts, key_entities)

        outline_response = await self.llm_interface.generate_response(outline_prompt)

        

        # Parse outline and create slide structure

        slides = await self.create_slides_from_outline(outline_response, processed_content, max_slides)

        

        # Generate title and subtitle

        title = await self.generate_presentation_title(topic)

        subtitle = await self.generate_presentation_subtitle(topic, key_concepts)

        

        return PresentationStructure(

            title=title,

            subtitle=subtitle,

            slides=slides,

            theme_name=self.config["presentation"]["default_theme"],

            total_estimated_duration=len(slides) * 2  # 2 minutes per slide estimate

        )

    

    def create_outline_prompt(self, topic: str, key_concepts: List[str], key_entities: List[str]) -> str:

        """Create prompt for LLM to generate presentation outline"""

        

        prompt = f"""Create a presentation outline for the topic: "{topic}"


Based on the research, the following key concepts were identified:

{', '.join(key_concepts[:5])}


Key entities mentioned include:

{', '.join(key_entities[:8])}


Please create a logical presentation structure with 8-12 slides that:

1. Introduces the topic clearly

2. Covers the main concepts in a logical order

3. Includes practical examples or applications

4. Concludes with key takeaways


For each slide, provide:

- Slide title

- 3-5 main points to cover

- Slide type (introduction, content, visual, conclusion)

- Importance score (1-10)


Format your response as a structured outline."""

        

        return prompt

    

    async def create_slides_from_outline(self, outline_response: str, 

                                       processed_content: List[ExtractedContent], 

                                       max_slides: int) -> List[SlideContent]:

        """Create slide content from LLM outline response"""

        

        slides = []

        

        # Parse outline (simplified parsing - real implementation would be more sophisticated)

        slide_sections = self.parse_outline_response(outline_response)

        

        for i, section in enumerate(slide_sections[:max_slides]):

            # Generate detailed content for each slide

            slide_content = await self.generate_slide_content(section, processed_content)

            slides.append(slide_content)

        

        return slides

    

    def parse_outline_response(self, response: str) -> List[Dict]:

        """Parse LLM outline response into structured sections"""

        

        # Simplified parsing - real implementation would use more sophisticated NLP

        sections = []

        lines = response.split('\n')

        

        current_section = {}

        for line in lines:

            line = line.strip()

            if line.startswith('Slide') or line.startswith('#'):

                if current_section:

                    sections.append(current_section)

                current_section = {'title': line, 'points': [], 'type': 'content', 'importance': 5}

            elif line.startswith('-') or line.startswith('•'):

                current_section['points'].append(line[1:].strip())

        

        if current_section:

            sections.append(current_section)

        

        return sections

    

    async def generate_slide_content(self, section: Dict, 

                                   processed_content: List[ExtractedContent]) -> SlideContent:

        """Generate detailed slide content"""

        

        title = section.get('title', 'Untitled Slide').replace('#', '').strip()

        

        # Get relevant content from RAG system

        context = await self.rag_system.generate_context_for_slide(title)

        

        # Generate detailed points using LLM

        content_prompt = f"""Create detailed content for a slide titled: "{title}"


Context from research:

{context[:1000]}


Original outline points:

{chr(10).join(section.get('points', []))}


Generate 3-5 clear, concise bullet points that:

- Are informative and accurate

- Use accessible language

- Include specific examples when possible

- Are suitable for a presentation slide


Also generate comprehensive speaker notes for this slide."""

        

        content_response = await self.llm_interface.generate_response(content_prompt)

        

        # Parse response to extract points and notes

        points, notes = self.parse_content_response(content_response)

        

        return SlideContent(

            title=title,

            content_points=points,

            visual_elements=[],  # Will be populated later based on visual matching

            notes=notes,

            slide_type=section.get('type', 'content'),

            importance_score=section.get('importance', 5) / 10

        )

    

    def parse_content_response(self, response: str) -> Tuple[List[str], str]:

        """Parse LLM content response into points and notes"""

        

        lines = response.split('\n')

        points = []

        notes_started = False

        notes_lines = []

        

        for line in lines:

            line = line.strip()

            if line.lower().startswith('notes:') or line.lower().startswith('speaker notes:'):

                notes_started = True

                continue

            

            if notes_started:

                notes_lines.append(line)

            elif line.startswith('-') or line.startswith('•'):

                points.append(line[1:].strip())

        

        notes = '\n'.join(notes_lines) if notes_lines else "No additional notes provided."

        

        return points[:5], notes  # Limit to 5 points

    

    async def generate_presentation_title(self, topic: str) -> str:

        """Generate presentation title using LLM"""

        

        prompt = f"Generate a clear, professional presentation title for the topic: {topic}. The title should be engaging but not overly creative. Respond with just the title."

        

        response = await self.llm_interface.generate_response(prompt)

        return response.strip().replace('"', '')

    

    async def generate_presentation_subtitle(self, topic: str, key_concepts: List[str]) -> str:

        """Generate presentation subtitle"""

        

        if key_concepts:

            return f"An overview of {topic} covering {', '.join(key_concepts[:2])}"

        else:

            return f"A comprehensive overview of {topic}"

    

    async def extend_presentation(self, presentation_path: str, additional_topic: str) -> str:

        """Extend existing presentation with additional content"""

        

        logger.info(f"Extending presentation with topic: {additional_topic}")

        

        # This would load the existing presentation and add new slides

        # Implementation would involve loading the PPTX file, analyzing existing content,

        # and generating additional slides that complement the existing structure

        

        # For now, return a placeholder

        return f"Extended presentation would be saved to: {presentation_path}"

    

    async def modify_presentation(self, presentation_path: str, modifications: Dict) -> str:

        """Modify existing presentation based on user requests"""

        

        logger.info(f"Modifying presentation: {modifications}")

        

        # This would load the existing presentation and apply requested modifications

        # such as changing themes, updating content, or reorganizing slides

        

        # For now, return a placeholder

        return f"Modified presentation would be saved to: {presentation_path}"


class LLMInterface:

    """Interface for both commercial and local LLMs"""

    

    def __init__(self, config: Dict):

        self.config = config

        self.provider = config.get("provider", "openai")

        

        if self.provider == "openai":

            self.initialize_openai()

        elif self.provider == "local":

            self.initialize_local_llm()

    

    def initialize_openai(self):

        """Initialize OpenAI client"""

        try:

            import openai

            self.client = openai.OpenAI(api_key=self.config.get("api_key"))

        except ImportError:

            logger.error("OpenAI library not installed. Install with: pip install openai")

            raise

    

    def initialize_local_llm(self):

        """Initialize local LLM"""

        # Placeholder for local LLM initialization

        # Would use libraries like transformers, llama-cpp-python, etc.

        logger.info("Local LLM initialization not implemented in this example")

        self.client = None

    

    async def generate_response(self, prompt: str) -> str:

        """Generate response using configured LLM"""

        

        if self.provider == "openai":

            return await self.generate_openai_response(prompt)

        elif self.provider == "local":

            return await self.generate_local_response(prompt)

        else:

            return "LLM not properly configured"

    

    async def generate_openai_response(self, prompt: str) -> str:

        """Generate response using OpenAI"""

        

        try:

            response = self.client.chat.completions.create(

                model=self.config.get("model", "gpt-3.5-turbo"),

                messages=[

                    {"role": "system", "content": "You are a helpful assistant that creates professional presentation content."},

                    {"role": "user", "content": prompt}

                ],

                max_tokens=1000,

                temperature=0.7

            )

            return response.choices[0].message.content

        except Exception as e:

            logger.error(f"Error generating OpenAI response: {e}")

            return "Error generating response"

    

    async def generate_local_response(self, prompt: str) -> str:

        """Generate response using local LLM"""

        

        # Placeholder for local LLM response generation

        return "Local LLM response generation not implemented in this example"


def main():

    """Main function to run the Agentic PowerPoint System"""

    

    parser = argparse.ArgumentParser(description="Agentic AI PowerPoint Generator")

    parser.add_argument("topic", help="Topic for presentation generation")

    parser.add_argument("--theme", help="PowerPoint theme to use", default=None)

    parser.add_argument("--max-slides", type=int, help="Maximum number of slides", default=None)

    parser.add_argument("--config", help="Configuration file path", default="config.json")

    

    args = parser.parse_args()

    

    async def run_system():

        try:

            # Initialize system

            system = AgenticPowerPointSystem(args.config)

            

            # Create presentation

            output_path = await system.create_presentation_from_topic(

                args.topic,

                theme=args.theme,

                max_slides=args.max_slides

            )

            

            print(f"Presentation created successfully: {output_path}")

            

        except Exception as e:

            logger.error(f"System error: {e}")

            print(f"Error: {e}")

    

    # Run the async function

    asyncio.run(run_system())


if __name__ == "__main__":

    main()


To use this complete system, you would run it from the command line like this:


python agentic_powerpoint_system.py "Artificial Intelligence in Healthcare" --theme professional_blue --max-slides 10


This complete implementation demonstrates how all the components work together to create a sophisticated Agentic AI system that can automatically generate professional PowerPoint presentations from user-specified topics. The system handles the entire workflow from research to final presentation creation while maintaining high quality standards and following UX best practices.


CONCLUSION AND FUTURE CONSIDERATIONS

The Agentic AI system for PowerPoint creation represents a significant advancement in automated content generation, combining multiple AI technologies to create a comprehensive solution that can transform simple user prompts into professional presentations. The system demonstrates how various AI components can work together orchestrally to accomplish complex, multi-step tasks that traditionally require significant human effort and expertise.

The modular architecture ensures that each component can be independently improved and updated without affecting the entire system. This design approach allows for easy integration of new technologies as they become available, such as more advanced Vision Language Models, improved embedding techniques, or more sophisticated knowledge graph algorithms.

Future enhancements could include real-time collaboration features, integration with enterprise knowledge bases, support for multiple languages, and advanced customization options for different industries or presentation styles. The system could also be extended to support other document formats beyond PowerPoint, such as interactive web presentations or PDF reports.

The implementation demonstrates the practical application of cutting-edge AI technologies in solving real-world business problems, showing how autonomous agents can augment human capabilities while maintaining quality and consistency standards. As AI technologies continue to evolve, systems like this will become increasingly sophisticated and capable of handling even more complex content creation tasks.

No comments: