Friday, October 17, 2025

BUILDING AN LLM-BASED CALENDAR APPLICATION




Introduction and Overview


Creating a calendar application that leverages Large Language Models represents a significant shift from traditional calendar interfaces. Instead of clicking through menus and forms, users can interact with their calendar using natural language commands such as "Schedule a meeting with John next Tuesday at 2 PM" or "Show me all my free slots this week." This approach transforms the calendar from a passive display tool into an intelligent assistant that understands context, intent, and user preferences.


The core challenge in building such an application lies in bridging the gap between human language and structured calendar data. When a user says "Move my dentist appointment to next week," the system must understand that this refers to a specific existing appointment, determine what "next week" means in the current context, and execute the appropriate database operations. This requires sophisticated natural language processing, robust data management, and intelligent reasoning capabilities.


The application we will design supports comprehensive calendar functionality including meeting creation, modification, and deletion through conversational interfaces. It automatically integrates holiday information based on user location, supports recurring meeting series, and provides intelligent scheduling suggestions. The system maintains rich metadata for each calendar entry, supporting notes, images, color coding, and categorization while offering multiple viewing modes from daily to yearly perspectives.


Architecture and Core Components


The foundation of our LLM-based calendar application rests on a modular architecture that separates concerns while enabling seamless integration between components. The system consists of several key layers: the natural language processing layer that interprets user commands, the business logic layer that manages calendar operations, the data persistence layer that handles storage and retrieval, and the presentation layer that renders calendar views and manages user interactions.


At the heart of the system lies the Language Model Integration Service, which processes user input and translates natural language commands into structured actions. This service must handle the inherent ambiguity of human language while maintaining context across conversation turns. For example, when a user says "Cancel it" after discussing a specific meeting, the system must remember which meeting was referenced and execute the cancellation accordingly.


The Calendar Engine serves as the central orchestrator, managing all calendar-related operations including event creation, modification, deletion, and querying. This component maintains the business rules for scheduling, handles conflict detection, and manages recurring event series. It interfaces with the Location Service to determine user timezone and regional holidays, and with the Notification Service to handle reminders and alerts.


Data persistence is handled through a flexible database layer that supports both relational and document storage patterns. Calendar events require structured fields for dates, times, and participants, while also supporting unstructured data such as notes and attached images. The storage layer must efficiently handle queries across different time ranges while supporting real-time updates and synchronization.


Here is a foundational code example that demonstrates the core architecture setup. This example shows how the main application components are initialized and how they interact with each other. The CalendarApplication class serves as the central coordinator, managing the lifecycle of all services and providing the main entry point for user interactions.



import asyncio

from datetime import datetime, timedelta

from typing import List, Optional, Dict, Any

import json


class CalendarApplication:

    def __init__(self):

        self.llm_service = LLMService()

        self.calendar_engine = CalendarEngine()

        self.location_service = LocationService()

        self.notification_service = NotificationService()

        self.user_context = UserContext()

        

    async def process_user_command(self, user_input: str) -> str:

        # Parse the natural language input

        intent = await self.llm_service.parse_intent(user_input, self.user_context)

        

        # Execute the appropriate calendar operation

        result = await self.calendar_engine.execute_command(intent)

        

        # Generate a natural language response

        response = await self.llm_service.generate_response(result, self.user_context)

        

        return response


class LLMService:

    def __init__(self):

        self.conversation_history = []

        

    async def parse_intent(self, user_input: str, context: 'UserContext') -> Dict[str, Any]:

        # This would integrate with an actual LLM service

        # For demonstration, we'll simulate intent parsing

        

        prompt = f"""

        Parse the following calendar command and extract the intent and parameters:

        User input: "{user_input}"

        Current date: {datetime.now().isoformat()}

        User timezone: {context.timezone}

        

        Return a JSON object with:

        - action: create/update/delete/query/find_slots

        - parameters: relevant details extracted from the input

        """

        

        # Simulated LLM response parsing

        if "schedule" in user_input.lower() or "meeting" in user_input.lower():

            return {

                "action": "create",

                "parameters": self._extract_meeting_details(user_input)

            }

        elif "show" in user_input.lower() or "what" in user_input.lower():

            return {

                "action": "query",

                "parameters": self._extract_query_details(user_input)

            }

        

        return {"action": "unknown", "parameters": {}}

    

    def _extract_meeting_details(self, text: str) -> Dict[str, Any]:

        # Simplified extraction logic

        return {

            "title": "Extracted meeting title",

            "datetime": datetime.now() + timedelta(days=1),

            "duration": 60,

            "participants": []

        }

    

    def _extract_query_details(self, text: str) -> Dict[str, Any]:

        return {

            "date_range": {

                "start": datetime.now().date(),

                "end": datetime.now().date() + timedelta(days=7)

            }

        }


class UserContext:

    def __init__(self):

        self.timezone = "UTC"

        self.location = None

        self.preferences = {}

        self.last_referenced_event = None



This architectural foundation provides the structure needed for natural language calendar interactions. The LLMService handles the complex task of understanding user intent, while the CalendarEngine manages the actual calendar operations. The UserContext maintains important state information that helps the system understand references and maintain conversation continuity.


Natural Language Processing Integration


The natural language processing component represents the most sophisticated aspect of our calendar application. Unlike traditional calendar interfaces that rely on forms and dropdown menus, our system must interpret the full spectrum of how users naturally express temporal concepts and scheduling intentions. This includes handling relative time references like "next Tuesday," "in two weeks," or "tomorrow morning," as well as understanding implicit context such as "move it to Friday" when referring to a previously mentioned meeting.


The LLM integration must handle several categories of calendar-related natural language understanding. Temporal expression parsing involves converting phrases like "next Monday at 3 PM" into specific datetime objects, accounting for the user's current timezone and calendar context. Entity extraction identifies key components such as meeting titles, participant names, locations, and duration specifications from free-form text. Intent classification determines whether the user wants to create, modify, delete, or query calendar information.


Context management proves particularly crucial for maintaining conversation flow. When a user says "Schedule a meeting with Sarah about the project review next Tuesday at 2 PM" followed by "Actually, make it 3 PM instead," the system must understand that "it" refers to the meeting just discussed and that only the time should be modified. This requires maintaining conversation state and entity resolution across multiple interaction turns.


Here is a detailed implementation of the natural language processing pipeline. This example demonstrates how user input is processed through multiple stages to extract structured calendar information. The NLPPipeline class coordinates the various processing steps, while specialized components handle specific aspects of language understanding.



import re

from datetime import datetime, timedelta

from typing import Dict, List, Optional, Tuple

import dateutil.parser

from dateutil.relativedelta import relativedelta


class NLPPipeline:

    def __init__(self):

        self.temporal_parser = TemporalParser()

        self.entity_extractor = EntityExtractor()

        self.intent_classifier = IntentClassifier()

        self.context_manager = ContextManager()

        

    async def process_input(self, user_input: str, context: UserContext) -> Dict[str, Any]:

        # Clean and normalize the input

        normalized_input = self._normalize_input(user_input)

        

        # Extract temporal expressions

        temporal_info = self.temporal_parser.parse_temporal_expressions(

            normalized_input, context.timezone

        )

        

        # Extract entities (people, places, topics)

        entities = self.entity_extractor.extract_entities(normalized_input)

        

        # Classify the intent

        intent = self.intent_classifier.classify_intent(

            normalized_input, temporal_info, entities

        )

        

        # Resolve references using context

        resolved_intent = self.context_manager.resolve_references(

            intent, context

        )

        

        return resolved_intent

    

    def _normalize_input(self, text: str) -> str:

        # Convert to lowercase and handle common abbreviations

        text = text.lower()

        text = re.sub(r'\btmrw\b', 'tomorrow', text)

        text = re.sub(r'\btues\b', 'tuesday', text)

        text = re.sub(r'\bwed\b', 'wednesday', text)

        text = re.sub(r'\bthurs\b', 'thursday', text)

        text = re.sub(r'\bfri\b', 'friday', text)

        text = re.sub(r'\bsat\b', 'saturday', text)

        text = re.sub(r'\bsun\b', 'sunday', text)

        text = re.sub(r'\bmon\b', 'monday', text)

        return text


class TemporalParser:

    def __init__(self):

        self.relative_patterns = {

            r'next (\w+)': self._parse_next_weekday,

            r'tomorrow': self._parse_tomorrow,

            r'today': self._parse_today,

            r'in (\d+) (days?|weeks?|months?)': self._parse_relative_duration,

            r'(\d{1,2}):(\d{2})\s*(am|pm)?': self._parse_time,

        }

    

    def parse_temporal_expressions(self, text: str, timezone: str) -> Dict[str, Any]:

        temporal_info = {

            'date': None,

            'time': None,

            'duration': None,

            'end_date': None,

            'recurrence': None

        }

        

        # Try to parse absolute dates first

        try:

            parsed_dt = dateutil.parser.parse(text, fuzzy=True)

            temporal_info['date'] = parsed_dt.date()

            temporal_info['time'] = parsed_dt.time()

        except:

            # Fall back to pattern matching for relative expressions

            for pattern, parser_func in self.relative_patterns.items():

                match = re.search(pattern, text, re.IGNORECASE)

                if match:

                    result = parser_func(match, timezone)

                    temporal_info.update(result)

                    break

        

        # Parse duration if specified

        duration_match = re.search(r'for (\d+) (hours?|minutes?)', text)

        if duration_match:

            amount = int(duration_match.group(1))

            unit = duration_match.group(2)

            if 'hour' in unit:

                temporal_info['duration'] = amount * 60

            else:

                temporal_info['duration'] = amount

        

        # Check for recurrence patterns

        if re.search(r'every (week|day|month)', text):

            temporal_info['recurrence'] = self._parse_recurrence(text)

        

        return temporal_info

    

    def _parse_next_weekday(self, match, timezone: str) -> Dict[str, Any]:

        weekday_name = match.group(1).lower()

        weekdays = {

            'monday': 0, 'tuesday': 1, 'wednesday': 2, 'thursday': 3,

            'friday': 4, 'saturday': 5, 'sunday': 6

        }

        

        if weekday_name in weekdays:

            target_weekday = weekdays[weekday_name]

            today = datetime.now().date()

            days_ahead = target_weekday - today.weekday()

            if days_ahead <= 0:  # Target day already happened this week

                days_ahead += 7

            target_date = today + timedelta(days=days_ahead)

            return {'date': target_date}

        

        return {}

    

    def _parse_tomorrow(self, match, timezone: str) -> Dict[str, Any]:

        tomorrow = datetime.now().date() + timedelta(days=1)

        return {'date': tomorrow}

    

    def _parse_today(self, match, timezone: str) -> Dict[str, Any]:

        today = datetime.now().date()

        return {'date': today}

    

    def _parse_relative_duration(self, match, timezone: str) -> Dict[str, Any]:

        amount = int(match.group(1))

        unit = match.group(2).lower()

        

        today = datetime.now().date()

        if 'day' in unit:

            target_date = today + timedelta(days=amount)

        elif 'week' in unit:

            target_date = today + timedelta(weeks=amount)

        elif 'month' in unit:

            target_date = today + relativedelta(months=amount)

        else:

            return {}

        

        return {'date': target_date}

    

    def _parse_time(self, match, timezone: str) -> Dict[str, Any]:

        hour = int(match.group(1))

        minute = int(match.group(2))

        ampm = match.group(3)

        

        if ampm and ampm.lower() == 'pm' and hour != 12:

            hour += 12

        elif ampm and ampm.lower() == 'am' and hour == 12:

            hour = 0

        

        from datetime import time

        parsed_time = time(hour, minute)

        return {'time': parsed_time}

    

    def _parse_recurrence(self, text: str) -> Dict[str, Any]:

        if 'every week' in text:

            return {'type': 'weekly', 'interval': 1}

        elif 'every day' in text:

            return {'type': 'daily', 'interval': 1}

        elif 'every month' in text:

            return {'type': 'monthly', 'interval': 1}

        return {}


class EntityExtractor:

    def extract_entities(self, text: str) -> Dict[str, List[str]]:

        entities = {

            'people': [],

            'locations': [],

            'topics': [],

            'organizations': []

        }

        

        # Simple pattern-based extraction (in production, use NER models)

        # Extract people mentioned with "with"

        people_pattern = r'with ([A-Z][a-z]+ ?[A-Z]?[a-z]*)'

        people_matches = re.findall(people_pattern, text)

        entities['people'].extend(people_matches)

        

        # Extract locations mentioned with "at" or "in"

        location_pattern = r'(?:at|in) ([A-Z][a-z]+(?: [A-Z][a-z]+)*)'

        location_matches = re.findall(location_pattern, text)

        entities['locations'].extend(location_matches)

        

        # Extract topics mentioned with "about"

        topic_pattern = r'about ([a-z]+(?: [a-z]+)*)'

        topic_matches = re.findall(topic_pattern, text)

        entities['topics'].extend(topic_matches)

        

        return entities


class IntentClassifier:

    def classify_intent(self, text: str, temporal_info: Dict, entities: Dict) -> Dict[str, Any]:

        intent = {

            'action': 'unknown',

            'confidence': 0.0,

            'parameters': {}

        }

        

        # Classify based on action verbs and context

        if any(word in text for word in ['schedule', 'book', 'plan', 'arrange']):

            intent['action'] = 'create'

            intent['confidence'] = 0.9

        elif any(word in text for word in ['cancel', 'delete', 'remove']):

            intent['action'] = 'delete'

            intent['confidence'] = 0.9

        elif any(word in text for word in ['move', 'reschedule', 'change']):

            intent['action'] = 'update'

            intent['confidence'] = 0.9

        elif any(word in text for word in ['show', 'list', 'what', 'when']):

            intent['action'] = 'query'

            intent['confidence'] = 0.8

        elif any(word in text for word in ['free', 'available', 'open']):

            intent['action'] = 'find_slots'

            intent['confidence'] = 0.8

        

        # Combine temporal and entity information

        intent['parameters'] = {

            'temporal': temporal_info,

            'entities': entities,

            'raw_text': text

        }

        

        return intent


class ContextManager:

    def __init__(self):

        self.conversation_history = []

        self.entity_references = {}

        

    def resolve_references(self, intent: Dict, context: UserContext) -> Dict[str, Any]:

        # Resolve pronouns and references like "it", "that meeting", etc.

        text = intent['parameters']['raw_text']

        

        if any(ref in text for ref in ['it', 'that', 'the meeting']):

            if context.last_referenced_event:

                intent['parameters']['target_event_id'] = context.last_referenced_event

        

        # Update conversation history

        self.conversation_history.append({

            'timestamp': datetime.now(),

            'intent': intent,

            'context': context

        })

        

        return intent



This natural language processing pipeline demonstrates how complex user input is systematically broken down into structured information that the calendar system can act upon. The TemporalParser handles the challenging task of converting human time expressions into precise datetime objects, while the EntityExtractor identifies relevant people, places, and topics. The IntentClassifier determines what action the user wants to perform, and the ContextManager maintains conversation state to handle references and follow-up commands.


Database Design and Data Models


The database design for an LLM-based calendar application must balance the structured nature of calendar data with the flexibility required for natural language interactions. Traditional calendar applications can rely on rigid schemas with predefined fields, but our system must accommodate the rich variety of information that users might express through natural language commands. This includes supporting arbitrary metadata, flexible categorization, and efficient querying across multiple dimensions.


The core data model centers around Events, which represent individual calendar entries, and EventSeries, which handle recurring meetings and appointment patterns. Each Event contains essential temporal information including start and end times, timezone data, and duration specifications. The model also supports rich metadata such as descriptions, notes, attached files, and color coding for visual organization.


User management requires careful consideration of privacy and access control. The system must support multiple users while maintaining strict data isolation. Each user has associated preferences, timezone settings, and location information that influences how calendar data is interpreted and displayed. The database must efficiently handle queries that span different users and time ranges while maintaining security boundaries.


Here is a comprehensive database model implementation that demonstrates the core data structures and relationships. This example uses SQLAlchemy to define the database schema, showing how calendar events, users, and related metadata are organized and connected. The model supports both simple events and complex recurring series while maintaining flexibility for future extensions.



from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Boolean, ForeignKey, JSON, Enum

from sqlalchemy.ext.declarative import declarative_base

from sqlalchemy.orm import relationship, sessionmaker

from sqlalchemy.dialects.postgresql import UUID

import uuid

from datetime import datetime, timedelta

from typing import Optional, List, Dict, Any

import enum


Base = declarative_base()


class EventType(enum.Enum):

    MEETING = "meeting"

    APPOINTMENT = "appointment"

    REMINDER = "reminder"

    TASK = "task"

    PERSONAL = "personal"

    WORK = "work"


class RecurrenceType(enum.Enum):

    NONE = "none"

    DAILY = "daily"

    WEEKLY = "weekly"

    MONTHLY = "monthly"

    YEARLY = "yearly"

    CUSTOM = "custom"


class User(Base):

    __tablename__ = 'users'

    

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

    username = Column(String(100), unique=True, nullable=False)

    email = Column(String(255), unique=True, nullable=False)

    timezone = Column(String(50), default='UTC')

    location_country = Column(String(100))

    location_region = Column(String(100))

    preferences = Column(JSON, default=dict)

    created_at = Column(DateTime, default=datetime.utcnow)

    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    

    # Relationships

    events = relationship("Event", back_populates="user", cascade="all, delete-orphan")

    event_series = relationship("EventSeries", back_populates="user", cascade="all, delete-orphan")

    

    def __repr__(self):

        return f"<User(username='{self.username}', email='{self.email}')>"


class EventSeries(Base):

    __tablename__ = 'event_series'

    

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

    user_id = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=False)

    title = Column(String(500), nullable=False)

    description = Column(Text)

    

    # Recurrence configuration

    recurrence_type = Column(Enum(RecurrenceType), default=RecurrenceType.NONE)

    recurrence_interval = Column(Integer, default=1)

    recurrence_end_date = Column(DateTime)

    recurrence_count = Column(Integer)

    recurrence_days_of_week = Column(JSON)  # For weekly recurrence: [0,1,2,3,4] for Mon-Fri

    recurrence_day_of_month = Column(Integer)  # For monthly recurrence

    

    # Series metadata

    event_type = Column(Enum(EventType), default=EventType.MEETING)

    color = Column(String(7), default='#3498db')  # Hex color code

    location = Column(String(500))

    

    created_at = Column(DateTime, default=datetime.utcnow)

    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    

    # Relationships

    user = relationship("User", back_populates="event_series")

    events = relationship("Event", back_populates="series", cascade="all, delete-orphan")

    

    def generate_events(self, start_date: datetime, end_date: datetime) -> List['Event']:

        """Generate individual events for this series within the given date range."""

        events = []

        current_date = start_date

        

        while current_date <= end_date:

            if self._should_create_event_on_date(current_date):

                event = Event(

                    series_id=self.id,

                    user_id=self.user_id,

                    title=self.title,

                    description=self.description,

                    start_time=current_date,

                    event_type=self.event_type,

                    color=self.color,

                    location=self.location

                )

                events.append(event)

            

            current_date += self._get_recurrence_delta()

            

            if self.recurrence_count and len(events) >= self.recurrence_count:

                break

        

        return events

    

    def _should_create_event_on_date(self, date: datetime) -> bool:

        """Check if an event should be created on the given date based on recurrence rules."""

        if self.recurrence_type == RecurrenceType.WEEKLY and self.recurrence_days_of_week:

            return date.weekday() in self.recurrence_days_of_week

        return True

    

    def _get_recurrence_delta(self) -> timedelta:

        """Get the time delta for the next occurrence."""

        if self.recurrence_type == RecurrenceType.DAILY:

            return timedelta(days=self.recurrence_interval)

        elif self.recurrence_type == RecurrenceType.WEEKLY:

            return timedelta(weeks=self.recurrence_interval)

        else:

            return timedelta(days=1)


class Event(Base):

    __tablename__ = 'events'

    

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

    user_id = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=False)

    series_id = Column(UUID(as_uuid=True), ForeignKey('event_series.id'), nullable=True)

    

    # Basic event information

    title = Column(String(500), nullable=False)

    description = Column(Text)

    notes = Column(Text)

    

    # Temporal information

    start_time = Column(DateTime, nullable=False)

    end_time = Column(DateTime)

    duration_minutes = Column(Integer)

    timezone = Column(String(50))

    all_day = Column(Boolean, default=False)

    

    # Event metadata

    event_type = Column(Enum(EventType), default=EventType.MEETING)

    color = Column(String(7), default='#3498db')

    location = Column(String(500))

    meeting_url = Column(String(1000))

    

    # Status and visibility

    status = Column(String(50), default='confirmed')  # confirmed, tentative, cancelled

    visibility = Column(String(50), default='private')  # private, public, confidential

    

    # Rich content

    attachments = Column(JSON, default=list)  # List of file references

    participants = Column(JSON, default=list)  # List of participant information

    tags = Column(JSON, default=list)  # User-defined tags

    

    # System metadata

    created_at = Column(DateTime, default=datetime.utcnow)

    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    created_from_nlp = Column(Boolean, default=False)

    original_nlp_input = Column(Text)  # Store the original user command

    

    # Relationships

    user = relationship("User", back_populates="events")

    series = relationship("EventSeries", back_populates="events")

    reminders = relationship("Reminder", back_populates="event", cascade="all, delete-orphan")

    

    def __repr__(self):

        return f"<Event(title='{self.title}', start_time='{self.start_time}')>"

    

    @property

    def calculated_end_time(self) -> datetime:

        """Calculate end time based on start time and duration if end_time is not set."""

        if self.end_time:

            return self.end_time

        elif self.duration_minutes:

            return self.start_time + timedelta(minutes=self.duration_minutes)

        else:

            return self.start_time + timedelta(hours=1)  # Default 1 hour duration

    

    def conflicts_with(self, other_event: 'Event') -> bool:

        """Check if this event conflicts with another event."""

        self_end = self.calculated_end_time

        other_end = other_event.calculated_end_time

        

        return (self.start_time < other_end and self_end > other_event.start_time)

    

    def to_dict(self) -> Dict[str, Any]:

        """Convert event to dictionary for API responses."""

        return {

            'id': str(self.id),

            'title': self.title,

            'description': self.description,

            'start_time': self.start_time.isoformat(),

            'end_time': self.calculated_end_time.isoformat(),

            'event_type': self.event_type.value,

            'color': self.color,

            'location': self.location,

            'participants': self.participants,

            'tags': self.tags

        }


class Reminder(Base):

    __tablename__ = 'reminders'

    

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

    event_id = Column(UUID(as_uuid=True), ForeignKey('events.id'), nullable=False)

    

    # Reminder timing

    trigger_time = Column(DateTime, nullable=False)

    minutes_before = Column(Integer)  # Alternative to absolute trigger_time

    

    # Reminder content and delivery

    message = Column(Text)

    delivery_method = Column(String(50), default='notification')  # notification, email, sms

    sent = Column(Boolean, default=False)

    sent_at = Column(DateTime)

    

    created_at = Column(DateTime, default=datetime.utcnow)

    

    # Relationships

    event = relationship("Event", back_populates="reminders")


class Holiday(Base):

    __tablename__ = 'holidays'

    

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)

    name = Column(String(200), nullable=False)

    date = Column(DateTime, nullable=False)

    country = Column(String(100))

    region = Column(String(100))

    holiday_type = Column(String(50))  # national, regional, religious, etc.

    description = Column(Text)

    

    created_at = Column(DateTime, default=datetime.utcnow)


class CalendarDatabase:

    def __init__(self, database_url: str):

        self.engine = create_engine(database_url)

        self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)

        Base.metadata.create_all(bind=self.engine)

    

    def get_session(self):

        """Get a database session."""

        return self.SessionLocal()

    

    def create_user(self, username: str, email: str, timezone: str = 'UTC') -> User:

        """Create a new user."""

        session = self.get_session()

        try:

            user = User(username=username, email=email, timezone=timezone)

            session.add(user)

            session.commit()

            session.refresh(user)

            return user

        finally:

            session.close()

    

    def create_event(self, user_id: str, event_data: Dict[str, Any]) -> Event:

        """Create a new event."""

        session = self.get_session()

        try:

            event = Event(

                user_id=user_id,

                title=event_data['title'],

                description=event_data.get('description'),

                start_time=event_data['start_time'],

                end_time=event_data.get('end_time'),

                duration_minutes=event_data.get('duration_minutes'),

                event_type=EventType(event_data.get('event_type', 'meeting')),

                location=event_data.get('location'),

                participants=event_data.get('participants', []),

                created_from_nlp=event_data.get('created_from_nlp', False),

                original_nlp_input=event_data.get('original_nlp_input')

            )

            session.add(event)

            session.commit()

            session.refresh(event)

            return event

        finally:

            session.close()

    

    def get_events_in_range(self, user_id: str, start_date: datetime, end_date: datetime) -> List[Event]:

        """Get all events for a user within a date range."""

        session = self.get_session()

        try:

            events = session.query(Event).filter(

                Event.user_id == user_id,

                Event.start_time >= start_date,

                Event.start_time <= end_date

            ).order_by(Event.start_time).all()

            return events

        finally:

            session.close()

    

    def find_free_slots(self, user_id: str, start_date: datetime, end_date: datetime, 

                       duration_minutes: int) -> List[Dict[str, datetime]]:

        """Find available time slots for a user within a date range."""

        session = self.get_session()

        try:

            existing_events = self.get_events_in_range(user_id, start_date, end_date)

            free_slots = []

            

            current_time = start_date

            while current_time + timedelta(minutes=duration_minutes) <= end_date:

                slot_end = current_time + timedelta(minutes=duration_minutes)

                

                # Check if this slot conflicts with any existing event

                conflicts = False

                for event in existing_events:

                    if (current_time < event.calculated_end_time and 

                        slot_end > event.start_time):

                        conflicts = True

                        break

                

                if not conflicts:

                    free_slots.append({

                        'start': current_time,

                        'end': slot_end

                    })

                

                current_time += timedelta(minutes=30)  # Check every 30 minutes

            

            return free_slots

        finally:

            session.close()



This database model provides a robust foundation for storing and managing calendar data while supporting the complex requirements of natural language interactions. The Event model captures both simple appointments and rich meeting information, while the EventSeries model handles recurring events efficiently. The inclusion of metadata fields like original_nlp_input allows the system to learn from user interactions and improve its natural language understanding over time.


Meeting Management System


The meeting management system serves as the operational core of our calendar application, translating natural language intents into concrete calendar operations. This system must handle the full lifecycle of calendar events, from initial creation through modification and eventual deletion, while maintaining data consistency and providing intelligent conflict resolution. The complexity arises from the need to support both simple one-time events and sophisticated recurring meeting series, all while preserving the context and relationships that users express through natural language.


Event creation involves more than simply storing date and time information. When a user says "Schedule a weekly team meeting every Tuesday at 10 AM starting next week," the system must create a recurring series, generate individual event instances, handle timezone calculations, and set up appropriate reminders. The system must also validate the request against existing calendar entries, detect potential conflicts, and suggest alternatives when necessary.


Modification operations present particular challenges because users often reference events indirectly. A command like "Move my dentist appointment to next Friday" requires the system to identify which specific appointment the user means, especially if there are multiple dental appointments scheduled. The system must maintain enough context to resolve such ambiguous references while providing clear confirmation of the intended changes.


Here is a comprehensive implementation of the meeting management system that demonstrates how natural language intents are translated into calendar operations. This example shows the CalendarEngine class that coordinates all calendar operations, along with specialized handlers for different types of calendar actions. The system includes conflict detection, recurring event management, and intelligent scheduling suggestions.



from datetime import datetime, timedelta, time

from typing import List, Dict, Any, Optional, Tuple

import asyncio

from dataclasses import dataclass

from enum import Enum


class ConflictResolution(Enum):

    REJECT = "reject"

    SUGGEST_ALTERNATIVE = "suggest_alternative"

    ALLOW_OVERLAP = "allow_overlap"

    AUTO_RESCHEDULE = "auto_reschedule"


@dataclass

class SchedulingConflict:

    conflicting_event: 'Event'

    proposed_event: Dict[str, Any]

    conflict_type: str

    suggested_alternatives: List[Dict[str, datetime]]


@dataclass

class OperationResult:

    success: bool

    message: str

    data: Optional[Any] = None

    conflicts: List[SchedulingConflict] = None

    suggestions: List[Dict[str, Any]] = None


class CalendarEngine:

    def __init__(self, database: CalendarDatabase):

        self.db = database

        self.conflict_resolver = ConflictResolver()

        self.recurrence_manager = RecurrenceManager()

        self.notification_scheduler = NotificationScheduler()

        

    async def execute_command(self, intent: Dict[str, Any]) -> OperationResult:

        """Execute a calendar command based on parsed intent."""

        action = intent.get('action')

        parameters = intent.get('parameters', {})

        

        try:

            if action == 'create':

                return await self._handle_create_event(parameters)

            elif action == 'update':

                return await self._handle_update_event(parameters)

            elif action == 'delete':

                return await self._handle_delete_event(parameters)

            elif action == 'query':

                return await self._handle_query_events(parameters)

            elif action == 'find_slots':

                return await self._handle_find_slots(parameters)

            else:

                return OperationResult(

                    success=False,

                    message=f"Unknown action: {action}"

                )

        except Exception as e:

            return OperationResult(

                success=False,

                message=f"Error executing command: {str(e)}"

            )

    

    async def _handle_create_event(self, parameters: Dict[str, Any]) -> OperationResult:

        """Handle event creation with conflict detection and resolution."""

        temporal_info = parameters.get('temporal', {})

        entities = parameters.get('entities', {})

        

        # Extract event details from parameters

        event_data = self._build_event_data(temporal_info, entities, parameters)

        

        # Validate the event data

        validation_result = self._validate_event_data(event_data)

        if not validation_result.success:

            return validation_result

        

        # Check for conflicts

        conflicts = await self._detect_conflicts(event_data)

        if conflicts:

            resolution = await self.conflict_resolver.resolve_conflicts(

                event_data, conflicts

            )

            if resolution.action == ConflictResolution.REJECT:

                return OperationResult(

                    success=False,

                    message="Event conflicts with existing appointments",

                    conflicts=conflicts

                )

            elif resolution.action == ConflictResolution.SUGGEST_ALTERNATIVE:

                return OperationResult(

                    success=False,

                    message="Time slot not available. Here are some alternatives:",

                    suggestions=resolution.alternatives

                )

        

        # Create the event or event series

        if temporal_info.get('recurrence'):

            result = await self._create_recurring_event(event_data, temporal_info['recurrence'])

        else:

            result = await self._create_single_event(event_data)

        

        # Schedule notifications

        if result.success and result.data:

            await self.notification_scheduler.schedule_reminders(result.data)

        

        return result

    

    async def _handle_update_event(self, parameters: Dict[str, Any]) -> OperationResult:

        """Handle event modification."""

        target_event_id = parameters.get('target_event_id')

        if not target_event_id:

            # Try to identify the event from context

            target_event_id = await self._identify_target_event(parameters)

        

        if not target_event_id:

            return OperationResult(

                success=False,

                message="Could not identify which event to modify"

            )

        

        # Get the existing event

        session = self.db.get_session()

        try:

            existing_event = session.query(Event).filter(Event.id == target_event_id).first()

            if not existing_event:

                return OperationResult(

                    success=False,

                    message="Event not found"

                )

            

            # Apply modifications

            modifications = self._extract_modifications(parameters)

            updated_event_data = self._apply_modifications(existing_event, modifications)

            

            # Validate the updated event

            validation_result = self._validate_event_data(updated_event_data)

            if not validation_result.success:

                return validation_result

            

            # Check for new conflicts

            conflicts = await self._detect_conflicts(updated_event_data, exclude_event_id=target_event_id)

            if conflicts:

                return OperationResult(

                    success=False,

                    message="Updated event would conflict with existing appointments",

                    conflicts=conflicts

                )

            

            # Update the event

            for key, value in updated_event_data.items():

                if hasattr(existing_event, key):

                    setattr(existing_event, key, value)

            

            existing_event.updated_at = datetime.utcnow()

            session.commit()

            

            return OperationResult(

                success=True,

                message=f"Event '{existing_event.title}' updated successfully",

                data=existing_event

            )

            

        finally:

            session.close()

    

    async def _handle_delete_event(self, parameters: Dict[str, Any]) -> OperationResult:

        """Handle event deletion."""

        target_event_id = parameters.get('target_event_id')

        if not target_event_id:

            target_event_id = await self._identify_target_event(parameters)

        

        if not target_event_id:

            return OperationResult(

                success=False,

                message="Could not identify which event to delete"

            )

        

        session = self.db.get_session()

        try:

            event = session.query(Event).filter(Event.id == target_event_id).first()

            if not event:

                return OperationResult(

                    success=False,

                    message="Event not found"

                )

            

            event_title = event.title

            

            # Handle series deletion if this is part of a recurring series

            if event.series_id:

                delete_series = parameters.get('delete_entire_series', False)

                if delete_series:

                    series = session.query(EventSeries).filter(EventSeries.id == event.series_id).first()

                    if series:

                        session.delete(series)  # Cascade will delete all events

                        message = f"Recurring series '{event_title}' deleted successfully"

                    else:

                        session.delete(event)

                        message = f"Event '{event_title}' deleted successfully"

                else:

                    session.delete(event)

                    message = f"Single occurrence of '{event_title}' deleted successfully"

            else:

                session.delete(event)

                message = f"Event '{event_title}' deleted successfully"

            

            session.commit()

            

            return OperationResult(

                success=True,

                message=message

            )

            

        finally:

            session.close()

    

    async def _handle_query_events(self, parameters: Dict[str, Any]) -> OperationResult:

        """Handle event queries and listing."""

        temporal_info = parameters.get('temporal', {})

        user_id = parameters.get('user_id')

        

        # Determine query date range

        if temporal_info.get('date'):

            start_date = datetime.combine(temporal_info['date'], time.min)

            end_date = datetime.combine(temporal_info['date'], time.max)

        elif temporal_info.get('date_range'):

            start_date = datetime.combine(temporal_info['date_range']['start'], time.min)

            end_date = datetime.combine(temporal_info['date_range']['end'], time.max)

        else:

            # Default to today

            today = datetime.now().date()

            start_date = datetime.combine(today, time.min)

            end_date = datetime.combine(today, time.max)

        

        # Query events

        events = self.db.get_events_in_range(user_id, start_date, end_date)

        

        # Format response

        if not events:

            message = f"No events found for the specified time period"

        else:

            event_list = []

            for event in events:

                event_list.append({

                    'title': event.title,

                    'start_time': event.start_time,

                    'end_time': event.calculated_end_time,

                    'location': event.location,

                    'type': event.event_type.value

                })

            message = f"Found {len(events)} event(s)"

        

        return OperationResult(

            success=True,

            message=message,

            data=events

        )

    

    async def _handle_find_slots(self, parameters: Dict[str, Any]) -> OperationResult:

        """Handle finding available time slots."""

        temporal_info = parameters.get('temporal', {})

        user_id = parameters.get('user_id')

        duration = parameters.get('duration', 60)  # Default 60 minutes

        

        # Determine search date range

        if temporal_info.get('date_range'):

            start_date = datetime.combine(temporal_info['date_range']['start'], time(9, 0))

            end_date = datetime.combine(temporal_info['date_range']['end'], time(17, 0))

        else:

            # Default to next 7 days, business hours

            today = datetime.now()

            start_date = datetime.combine(today.date(), time(9, 0))

            end_date = datetime.combine(today.date() + timedelta(days=7), time(17, 0))

        

        # Find available slots

        free_slots = self.db.find_free_slots(user_id, start_date, end_date, duration)

        

        # Limit to reasonable number of suggestions

        free_slots = free_slots[:10]

        

        if not free_slots:

            message = "No available time slots found in the specified period"

        else:

            message = f"Found {len(free_slots)} available time slot(s)"

        

        return OperationResult(

            success=True,

            message=message,

            data=free_slots

        )

    

    def _build_event_data(self, temporal_info: Dict, entities: Dict, parameters: Dict) -> Dict[str, Any]:

        """Build event data from parsed parameters."""

        event_data = {

            'title': self._extract_title(parameters, entities),

            'description': self._extract_description(parameters, entities),

            'start_time': self._extract_start_time(temporal_info),

            'duration_minutes': temporal_info.get('duration', 60),

            'location': entities.get('locations', [None])[0],

            'participants': entities.get('people', []),

            'event_type': self._determine_event_type(parameters, entities),

            'created_from_nlp': True,

            'original_nlp_input': parameters.get('raw_text')

        }

        

        # Set end time if duration is specified

        if event_data['start_time'] and event_data['duration_minutes']:

            event_data['end_time'] = event_data['start_time'] + timedelta(

                minutes=event_data['duration_minutes']

            )

        

        return event_data

    

    def _extract_title(self, parameters: Dict, entities: Dict) -> str:

        """Extract event title from parameters and entities."""

        raw_text = parameters.get('raw_text', '')

        

        # Try to extract a meaningful title

        if entities.get('topics'):

            topic = entities['topics'][0]

            if entities.get('people'):

                return f"Meeting with {entities['people'][0]} about {topic}"

            else:

                return f"Meeting about {topic}"

        elif entities.get('people'):

            return f"Meeting with {entities['people'][0]}"

        else:

            # Extract title from common patterns

            if 'meeting' in raw_text.lower():

                return "Meeting"

            elif 'appointment' in raw_text.lower():

                return "Appointment"

            elif 'call' in raw_text.lower():

                return "Call"

            else:

                return "Event"

    

    def _extract_start_time(self, temporal_info: Dict) -> Optional[datetime]:

        """Extract start time from temporal information."""

        date = temporal_info.get('date')

        time_obj = temporal_info.get('time')

        

        if date and time_obj:

            return datetime.combine(date, time_obj)

        elif date:

            return datetime.combine(date, time(9, 0))  # Default to 9 AM

        else:

            return None

    

    def _determine_event_type(self, parameters: Dict, entities: Dict) -> EventType:

        """Determine event type based on context."""

        raw_text = parameters.get('raw_text', '').lower()

        

        if any(word in raw_text for word in ['doctor', 'dentist', 'appointment']):

            return EventType.APPOINTMENT

        elif any(word in raw_text for word in ['personal', 'family', 'friend']):

            return EventType.PERSONAL

        elif any(word in raw_text for word in ['work', 'office', 'business', 'meeting']):

            return EventType.WORK

        else:

            return EventType.MEETING

    

    async def _detect_conflicts(self, event_data: Dict[str, Any], exclude_event_id: str = None) -> List[SchedulingConflict]:

        """Detect scheduling conflicts for a proposed event."""

        conflicts = []

        

        if not event_data.get('start_time'):

            return conflicts

        

        start_time = event_data['start_time']

        end_time = event_data.get('end_time') or start_time + timedelta(minutes=event_data.get('duration_minutes', 60))

        

        # Query existing events in the time range

        existing_events = self.db.get_events_in_range(

            event_data.get('user_id'),

            start_time - timedelta(hours=1),

            end_time + timedelta(hours=1)

        )

        

        for existing_event in existing_events:

            if exclude_event_id and str(existing_event.id) == exclude_event_id:

                continue

                

            if (start_time < existing_event.calculated_end_time and 

                end_time > existing_event.start_time):

                

                conflict = SchedulingConflict(

                    conflicting_event=existing_event,

                    proposed_event=event_data,

                    conflict_type="time_overlap",

                    suggested_alternatives=[]

                )

                conflicts.append(conflict)

        

        return conflicts

    

    def _validate_event_data(self, event_data: Dict[str, Any]) -> OperationResult:

        """Validate event data before creation or modification."""

        if not event_data.get('title'):

            return OperationResult(

                success=False,

                message="Event title is required"

            )

        

        if not event_data.get('start_time'):

            return OperationResult(

                success=False,

                message="Event start time is required"

            )

        

        # Check if start time is in the past

        if event_data['start_time'] < datetime.now() - timedelta(minutes=5):

            return OperationResult(

                success=False,

                message="Cannot schedule events in the past"

            )

        

        return OperationResult(success=True, message="Event data is valid")

    

    async def _create_single_event(self, event_data: Dict[str, Any]) -> OperationResult:

        """Create a single event."""

        try:

            event = self.db.create_event(event_data['user_id'], event_data)

            return OperationResult(

                success=True,

                message=f"Event '{event.title}' created successfully",

                data=event

            )

        except Exception as e:

            return OperationResult(

                success=False,

                message=f"Failed to create event: {str(e)}"

            )


class ConflictResolver:

    async def resolve_conflicts(self, proposed_event: Dict[str, Any], 

                              conflicts: List[SchedulingConflict]) -> 'ConflictResolution':

        """Resolve scheduling conflicts using various strategies."""

        # For now, suggest alternative times

        alternatives = await self._find_alternative_times(proposed_event, conflicts)

        

        return ConflictResolutionResult(

            action=ConflictResolution.SUGGEST_ALTERNATIVE,

            alternatives=alternatives

        )

    

    async def _find_alternative_times(self, proposed_event: Dict[str, Any], 

                                    conflicts: List[SchedulingConflict]) -> List[Dict[str, datetime]]:

        """Find alternative time slots for a conflicted event."""

        alternatives = []

        original_start = proposed_event['start_time']

        duration = timedelta(minutes=proposed_event.get('duration_minutes', 60))

        

        # Try different time slots around the original time

        for offset_hours in [1, -1, 2, -2, 24, -24]:

            alternative_start = original_start + timedelta(hours=offset_hours)

            alternative_end = alternative_start + duration

            

            # Check if this alternative conflicts

            has_conflict = False

            for conflict in conflicts:

                existing_event = conflict.conflicting_event

                if (alternative_start < existing_event.calculated_end_time and 

                    alternative_end > existing_event.start_time):

                    has_conflict = True

                    break

            

            if not has_conflict:

                alternatives.append({

                    'start': alternative_start,

                    'end': alternative_end

                })

                

                if len(alternatives) >= 3:  # Limit suggestions

                    break

        

        return alternatives


@dataclass

class ConflictResolutionResult:

    action: ConflictResolution

    alternatives: List[Dict[str, datetime]] = None

    message: str = ""


class RecurrenceManager:

    def create_recurring_series(self, event_data: Dict[str, Any], 

                              recurrence_config: Dict[str, Any]) -> EventSeries:

        """Create a recurring event series."""

        # Implementation for creating recurring event series

        pass


class NotificationScheduler:

    async def schedule_reminders(self, event: Event):

        """Schedule reminders for an event."""

        # Implementation for scheduling notifications

        pass



This meeting management system provides comprehensive functionality for handling calendar operations through natural language interfaces. The CalendarEngine coordinates all operations while specialized components handle conflict resolution, recurring events, and notifications. The system maintains data integrity while providing intelligent suggestions and handling the complexities of natural language calendar interactions.


Holiday Integration and Location Services


The integration of holiday information and location-based services adds significant value to our calendar application by automatically providing relevant contextual information without requiring explicit user input. This system must handle the complexities of different regional holiday calendars, varying cultural observances, and the dynamic nature of user locations. The challenge lies in maintaining accurate, up-to-date holiday information while respecting user privacy and providing seamless integration with calendar functionality.


Holiday data management requires careful consideration of data sources, update mechanisms, and regional variations. Different countries observe different holidays, and even within countries, regional variations exist. Some holidays follow fixed dates while others follow complex lunar or religious calendars that change annually. The system must handle federal holidays, regional observances, religious holidays, and cultural celebrations while allowing users to customize which types of holidays they want to see.


Location services must balance accuracy with privacy concerns. The system needs to determine user location to provide relevant holiday information and timezone handling, but this must be done with appropriate user consent and data protection measures. The location service should support both automatic detection and manual configuration, allowing users to specify their primary location while handling temporary travel situations.


Here is a comprehensive implementation of the holiday integration and location services system. This example demonstrates how holiday data is managed, how location information is used to determine relevant holidays, and how the system integrates with the calendar to provide contextual information. The implementation includes support for multiple holiday sources and intelligent caching mechanisms.



import requests

import asyncio

from datetime import datetime, date, timedelta

from typing import List, Dict, Any, Optional, Tuple

import json

from dataclasses import dataclass

from enum import Enum

import geocoder

import pytz

from geopy.geocoders import Nominatim

import holidays as python_holidays


class HolidayType(Enum):

    NATIONAL = "national"

    REGIONAL = "regional"

    RELIGIOUS = "religious"

    CULTURAL = "cultural"

    OBSERVANCE = "observance"


@dataclass

class LocationInfo:

    country: str

    country_code: str

    region: str

    city: str

    timezone: str

    latitude: float

    longitude: float

    confidence: float


@dataclass

class HolidayInfo:

    name: str

    date: date

    country: str

    region: Optional[str]

    holiday_type: HolidayType

    description: Optional[str]

    is_public_holiday: bool

    observed_date: Optional[date] = None


class LocationService:

    def __init__(self):

        self.geolocator = Nominatim(user_agent="calendar_app")

        self.location_cache = {}

        

    async def get_user_location(self, ip_address: Optional[str] = None, 

                               user_provided_location: Optional[str] = None) -> LocationInfo:

        """Determine user location from various sources."""

        

        if user_provided_location:

            return await self._geocode_location(user_provided_location)

        

        if ip_address:

            return await self._get_location_from_ip(ip_address)

        

        # Fallback to default location (could be user's saved preference)

        return LocationInfo(

            country="United States",

            country_code="US",

            region="",

            city="",

            timezone="America/New_York",

            latitude=40.7128,

            longitude=-74.0060,

            confidence=0.5

        )

    

    async def _get_location_from_ip(self, ip_address: str) -> LocationInfo:

        """Get location information from IP address."""

        try:

            # Use a geolocation service (in production, use a reliable paid service)

            g = geocoder.ip(ip_address)

            

            if g.ok:

                # Get timezone for the coordinates

                timezone = await self._get_timezone_for_coordinates(g.latlng[0], g.latlng[1])

                

                return LocationInfo(

                    country=g.country,

                    country_code=g.country_code,

                    region=g.state,

                    city=g.city,

                    timezone=timezone,

                    latitude=g.latlng[0],

                    longitude=g.latlng[1],

                    confidence=g.confidence

                )

        except Exception as e:

            print(f"Error getting location from IP: {e}")

        

        return await self.get_user_location()  # Fallback to default

    

    async def _geocode_location(self, location_string: str) -> LocationInfo:

        """Geocode a location string to get detailed location information."""

        try:

            location = self.geolocator.geocode(location_string, exactly_one=True)

            

            if location:

                # Extract country and region information

                address_components = location.raw.get('display_name', '').split(', ')

                

                # Get timezone for the coordinates

                timezone = await self._get_timezone_for_coordinates(

                    location.latitude, location.longitude

                )

                

                return LocationInfo(

                    country=self._extract_country(location.raw),

                    country_code=self._extract_country_code(location.raw),

                    region=self._extract_region(location.raw),

                    city=self._extract_city(location.raw),

                    timezone=timezone,

                    latitude=location.latitude,

                    longitude=location.longitude,

                    confidence=1.0

                )

        except Exception as e:

            print(f"Error geocoding location: {e}")

        

        return await self.get_user_location()  # Fallback to default

    

    async def _get_timezone_for_coordinates(self, lat: float, lon: float) -> str:

        """Get timezone for given coordinates."""

        try:

            # In production, use a timezone API service

            # For now, we'll use a simple mapping based on common timezones

            

            # This is a simplified approach - in production, use a proper timezone API

            if -125 <= lon <= -66:  # Rough US longitude range

                if lat >= 49:

                    return "America/Anchorage"  # Alaska

                elif lat >= 45:

                    return "America/New_York"   # Northern US

                elif lat >= 32:

                    return "America/Chicago"    # Central US

                else:

                    return "America/Los_Angeles"  # Southern/Western US

            elif -10 <= lon <= 40:  # Rough Europe longitude range

                return "Europe/London"

            elif 100 <= lon <= 140:  # Rough Asia-Pacific longitude range

                return "Asia/Tokyo"

            else:

                return "UTC"

        except:

            return "UTC"

    

    def _extract_country(self, geocode_result: Dict) -> str:

        """Extract country name from geocoding result."""

        address = geocode_result.get('address', {})

        return address.get('country', 'Unknown')

    

    def _extract_country_code(self, geocode_result: Dict) -> str:

        """Extract country code from geocoding result."""

        address = geocode_result.get('address', {})

        return address.get('country_code', 'XX').upper()

    

    def _extract_region(self, geocode_result: Dict) -> str:

        """Extract region/state from geocoding result."""

        address = geocode_result.get('address', {})

        return address.get('state', address.get('region', ''))

    

    def _extract_city(self, geocode_result: Dict) -> str:

        """Extract city from geocoding result."""

        address = geocode_result.get('address', {})

        return address.get('city', address.get('town', address.get('village', '')))


class HolidayService:

    def __init__(self, database: CalendarDatabase):

        self.db = database

        self.holiday_cache = {}

        self.last_update = {}

        

    async def get_holidays_for_location(self, location: LocationInfo, 

                                      year: int = None) -> List[HolidayInfo]:

        """Get holidays for a specific location and year."""

        if year is None:

            year = datetime.now().year

        

        cache_key = f"{location.country_code}_{location.region}_{year}"

        

        # Check cache first

        if cache_key in self.holiday_cache:

            cache_time = self.last_update.get(cache_key, datetime.min)

            if datetime.now() - cache_time < timedelta(days=30):  # Cache for 30 days

                return self.holiday_cache[cache_key]

        

        # Fetch holidays from multiple sources

        holidays_list = []

        

        # Get national holidays

        national_holidays = await self._get_national_holidays(location.country_code, year)

        holidays_list.extend(national_holidays)

        

        # Get regional holidays if region is specified

        if location.region:

            regional_holidays = await self._get_regional_holidays(

                location.country_code, location.region, year

            )

            holidays_list.extend(regional_holidays)

        

        # Get religious holidays

        religious_holidays = await self._get_religious_holidays(location.country_code, year)

        holidays_list.extend(religious_holidays)

        

        # Cache the results

        self.holiday_cache[cache_key] = holidays_list

        self.last_update[cache_key] = datetime.now()

        

        # Store in database for persistence

        await self._store_holidays_in_database(holidays_list)

        

        return holidays_list

    

    async def _get_national_holidays(self, country_code: str, year: int) -> List[HolidayInfo]:

        """Get national holidays using the python-holidays library."""

        holidays_list = []

        

        try:

            # Use python-holidays library for reliable holiday data

            country_holidays = python_holidays.country_holidays(country_code, years=year)

            

            for holiday_date, holiday_name in country_holidays.items():

                holiday_info = HolidayInfo(

                    name=holiday_name,

                    date=holiday_date,

                    country=country_code,

                    region=None,

                    holiday_type=HolidayType.NATIONAL,

                    description=f"National holiday in {country_code}",

                    is_public_holiday=True

                )

                holidays_list.append(holiday_info)

                

        except Exception as e:

            print(f"Error fetching national holidays for {country_code}: {e}")

        

        return holidays_list

    

    async def _get_regional_holidays(self, country_code: str, region: str, 

                                   year: int) -> List[HolidayInfo]:

        """Get regional/state holidays."""

        holidays_list = []

        

        try:

            # Some countries support regional holidays in python-holidays

            if country_code == 'US':

                # US state holidays

                state_holidays = python_holidays.US(state=region, years=year)

                for holiday_date, holiday_name in state_holidays.items():

                    # Only include holidays not already in national list

                    if holiday_date not in python_holidays.US(years=year):

                        holiday_info = HolidayInfo(

                            name=holiday_name,

                            date=holiday_date,

                            country=country_code,

                            region=region,

                            holiday_type=HolidayType.REGIONAL,

                            description=f"Regional holiday in {region}, {country_code}",

                            is_public_holiday=True

                        )

                        holidays_list.append(holiday_info)

            

            elif country_code == 'CA':

                # Canadian provincial holidays

                prov_holidays = python_holidays.Canada(prov=region, years=year)

                for holiday_date, holiday_name in prov_holidays.items():

                    if holiday_date not in python_holidays.Canada(years=year):

                        holiday_info = HolidayInfo(

                            name=holiday_name,

                            date=holiday_date,

                            country=country_code,

                            region=region,

                            holiday_type=HolidayType.REGIONAL,

                            description=f"Provincial holiday in {region}, {country_code}",

                            is_public_holiday=True

                        )

                        holidays_list.append(holiday_info)

                        

        except Exception as e:

            print(f"Error fetching regional holidays for {region}, {country_code}: {e}")

        

        return holidays_list

    

    async def _get_religious_holidays(self, country_code: str, year: int) -> List[HolidayInfo]:

        """Get religious holidays and observances."""

        holidays_list = []

        

        # This would typically integrate with religious calendar APIs

        # For demonstration, we'll include some common religious observances

        

        try:

            # Example: Add common Christian holidays that might not be public holidays

            religious_observances = [

                ("Ash Wednesday", self._calculate_ash_wednesday(year)),

                ("Palm Sunday", self._calculate_palm_sunday(year)),

                ("Maundy Thursday", self._calculate_maundy_thursday(year)),

                ("Good Friday", self._calculate_good_friday(year)),

                ("Easter Sunday", self._calculate_easter_sunday(year)),

            ]

            

            for name, holiday_date in religious_observances:

                if holiday_date:

                    holiday_info = HolidayInfo(

                        name=name,

                        date=holiday_date,

                        country=country_code,

                        region=None,

                        holiday_type=HolidayType.RELIGIOUS,

                        description=f"Christian observance",

                        is_public_holiday=False

                    )

                    holidays_list.append(holiday_info)

                    

        except Exception as e:

            print(f"Error calculating religious holidays: {e}")

        

        return holidays_list

    

    def _calculate_easter_sunday(self, year: int) -> date:

        """Calculate Easter Sunday using the Western Christian calendar."""

        # Using the algorithm for calculating Easter

        a = year % 19

        b = year // 100

        c = year % 100

        d = b // 4

        e = b % 4

        f = (b + 8) // 25

        g = (b - f + 1) // 3

        h = (19 * a + b - d - g + 15) % 30

        i = c // 4

        k = c % 4

        l = (32 + 2 * e + 2 * i - h - k) % 7

        m = (a + 11 * h + 22 * l) // 451

        month = (h + l - 7 * m + 114) // 31

        day = ((h + l - 7 * m + 114) % 31) + 1

        

        return date(year, month, day)

    

    def _calculate_ash_wednesday(self, year: int) -> date:

        """Calculate Ash Wednesday (46 days before Easter)."""

        easter = self._calculate_easter_sunday(year)

        return easter - timedelta(days=46)

    

    def _calculate_palm_sunday(self, year: int) -> date:

        """Calculate Palm Sunday (7 days before Easter)."""

        easter = self._calculate_easter_sunday(year)

        return easter - timedelta(days=7)

    

    def _calculate_maundy_thursday(self, year: int) -> date:

        """Calculate Maundy Thursday (3 days before Easter)."""

        easter = self._calculate_easter_sunday(year)

        return easter - timedelta(days=3)

    

    def _calculate_good_friday(self, year: int) -> date:

        """Calculate Good Friday (2 days before Easter)."""

        easter = self._calculate_easter_sunday(year)

        return easter - timedelta(days=2)

    

    async def _store_holidays_in_database(self, holidays_list: List[HolidayInfo]):

        """Store holidays in the database for persistence and caching."""

        session = self.db.get_session()

        try:

            for holiday_info in holidays_list:

                # Check if holiday already exists

                existing_holiday = session.query(Holiday).filter(

                    Holiday.name == holiday_info.name,

                    Holiday.date == datetime.combine(holiday_info.date, datetime.min.time()),

                    Holiday.country == holiday_info.country

                ).first()

                

                if not existing_holiday:

                    holiday = Holiday(

                        name=holiday_info.name,

                        date=datetime.combine(holiday_info.date, datetime.min.time()),

                        country=holiday_info.country,

                        region=holiday_info.region,

                        holiday_type=holiday_info.holiday_type.value,

                        description=holiday_info.description

                    )

                    session.add(holiday)

            

            session.commit()

        except Exception as e:

            print(f"Error storing holidays in database: {e}")

            session.rollback()

        finally:

            session.close()


class HolidayIntegrationService:

    def __init__(self, location_service: LocationService, holiday_service: HolidayService):

        self.location_service = location_service

        self.holiday_service = holiday_service

        

    async def integrate_holidays_for_user(self, user_id: str, 

                                        user_location: Optional[str] = None) -> List[HolidayInfo]:

        """Integrate holidays for a specific user based on their location."""

        

        # Get user location

        location = await self.location_service.get_user_location(

            user_provided_location=user_location

        )

        

        # Get current year holidays

        current_year = datetime.now().year

        holidays_this_year = await self.holiday_service.get_holidays_for_location(

            location, current_year

        )

        

        # Get next year holidays if we're near the end of the year

        if datetime.now().month >= 10:

            holidays_next_year = await self.holiday_service.get_holidays_for_location(

                location, current_year + 1

            )

            holidays_this_year.extend(holidays_next_year)

        

        return holidays_this_year

    

    async def get_holidays_for_date_range(self, user_id: str, start_date: date, 

                                        end_date: date) -> List[HolidayInfo]:

        """Get holidays within a specific date range for a user."""

        

        # Get user location (this could be cached per user)

        location = await self.location_service.get_user_location()

        

        holidays_in_range = []

        

        # Get holidays for each year in the date range

        for year in range(start_date.year, end_date.year + 1):

            year_holidays = await self.holiday_service.get_holidays_for_location(location, year)

            

            # Filter holidays within the date range

            for holiday in year_holidays:

                if start_date <= holiday.date <= end_date:

                    holidays_in_range.append(holiday)

        

        return holidays_in_range

    

    async def check_holiday_conflicts(self, event_date: date, location: LocationInfo) -> Optional[HolidayInfo]:

        """Check if an event date conflicts with a holiday."""

        holidays = await self.holiday_service.get_holidays_for_location(location, event_date.year)

        

        for holiday in holidays:

            if holiday.date == event_date and holiday.is_public_holiday:

                return holiday

        

        return None



This holiday integration and location services system provides comprehensive support for automatically incorporating relevant holiday information into the calendar application. The LocationService handles the complex task of determining user location while respecting privacy, and the HolidayService manages holiday data from multiple sources with intelligent caching. The integration service coordinates these components to provide seamless holiday awareness throughout the calendar application, helping users avoid scheduling conflicts and stay aware of relevant observances in their region.


Calendar Display and Visualization


The calendar display and visualization system represents the primary interface through which users interact with their calendar data. This component must present complex temporal information in an intuitive and visually appealing manner while supporting multiple viewing modes and interaction patterns. The challenge lies in creating a flexible display system that can render everything from detailed daily schedules to high-level yearly overviews while maintaining consistency and usability across different view types.


The visualization system must handle various data types including single events, recurring series, all-day events, multi-day events, and holiday information. Each type of calendar entry may have different visual requirements, such as color coding for event types, special indicators for recurring events, and appropriate spacing for events of different durations. The system must also support user customization options such as color themes, font sizes, and layout preferences.


Responsive design considerations are crucial since users may access the calendar from different devices with varying screen sizes and interaction capabilities. The display system must adapt gracefully from desktop monitors to mobile phones while maintaining functionality and readability. This includes supporting both mouse-based interactions and touch gestures, as well as keyboard navigation for accessibility.


Here is a comprehensive implementation of the calendar display and visualization system. This example demonstrates how calendar data is transformed into visual representations across different view modes, including daily, weekly, monthly, and yearly views. The implementation includes support for responsive design, user customization, and interactive features.



from datetime import datetime, date, timedelta, time

from typing import List, Dict, Any, Optional, Tuple

from dataclasses import dataclass

from enum import Enum

import json

from calendar import monthrange, weekday

import colorsys


class ViewMode(Enum):

    DAY = "day"

    WEEK = "week"

    MONTH = "month"

    YEAR = "year"

    AGENDA = "agenda"


class DisplayTheme(Enum):

    LIGHT = "light"

    DARK = "dark"

    AUTO = "auto"


@dataclass

class ViewConfig:

    mode: ViewMode

    start_date: date

    end_date: date

    theme: DisplayTheme

    show_weekends: bool = True

    show_holidays: bool = True

    show_all_day_events: bool = True

    time_format_24h: bool = False

    first_day_of_week: int = 0  # 0 = Monday, 6 = Sunday


@dataclass

class EventDisplayInfo:

    event_id: str

    title: str

    start_time: datetime

    end_time: datetime

    color: str

    event_type: str

    location: Optional[str]

    is_all_day: bool

    is_recurring: bool

    conflict_level: int = 0

    display_priority: int = 0


@dataclass

class CalendarCell:

    date: date

    events: List[EventDisplayInfo]

    holidays: List[str]

    is_today: bool

    is_weekend: bool

    is_other_month: bool

    available_slots: List[Tuple[time, time]]


class CalendarRenderer:

    def __init__(self):

        self.color_palette = self._initialize_color_palette()

        self.layout_calculator = LayoutCalculator()

        self.conflict_detector = DisplayConflictDetector()

        

    def render_calendar_view(self, events: List[Event], holidays: List[HolidayInfo], 

                           config: ViewConfig) -> Dict[str, Any]:

        """Render a calendar view based on the specified configuration."""

        

        # Prepare event display information

        display_events = self._prepare_event_display_info(events, config)

        

        # Detect and resolve display conflicts

        display_events = self.conflict_detector.resolve_display_conflicts(display_events)

        

        # Generate the appropriate view

        if config.mode == ViewMode.DAY:

            return self._render_day_view(display_events, holidays, config)

        elif config.mode == ViewMode.WEEK:

            return self._render_week_view(display_events, holidays, config)

        elif config.mode == ViewMode.MONTH:

            return self._render_month_view(display_events, holidays, config)

        elif config.mode == ViewMode.YEAR:

            return self._render_year_view(display_events, holidays, config)

        elif config.mode == ViewMode.AGENDA:

            return self._render_agenda_view(display_events, holidays, config)

        else:

            raise ValueError(f"Unsupported view mode: {config.mode}")

    

    def _prepare_event_display_info(self, events: List[Event], 

                                  config: ViewConfig) -> List[EventDisplayInfo]:

        """Convert Event objects to EventDisplayInfo for rendering."""

        display_events = []

        

        for event in events:

            # Skip events outside the view range

            if (event.start_time.date() > config.end_date or 

                event.calculated_end_time.date() < config.start_date):

                continue

            

            display_info = EventDisplayInfo(

                event_id=str(event.id),

                title=event.title,

                start_time=event.start_time,

                end_time=event.calculated_end_time,

                color=event.color or self._get_default_color(event.event_type),

                event_type=event.event_type.value,

                location=event.location,

                is_all_day=event.all_day,

                is_recurring=bool(event.series_id),

                display_priority=self._calculate_display_priority(event)

            )

            display_events.append(display_info)

        

        return display_events

    

    def _render_day_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                        config: ViewConfig) -> Dict[str, Any]:

        """Render a detailed day view."""

        target_date = config.start_date

        

        # Filter events for the target date

        day_events = [e for e in events if e.start_time.date() <= target_date <= e.end_time.date()]

        

        # Separate all-day and timed events

        all_day_events = [e for e in day_events if e.is_all_day]

        timed_events = [e for e in day_events if not e.is_all_day]

        

        # Generate time slots (15-minute intervals)

        time_slots = self._generate_time_slots(datetime.combine(target_date, time(0, 0)),

                                             datetime.combine(target_date, time(23, 59)),

                                             15)

        

        # Layout timed events

        event_layout = self.layout_calculator.calculate_day_layout(timed_events, time_slots)

        

        # Get holidays for the date

        day_holidays = [h.name for h in holidays if h.date == target_date]

        

        return {

            'view_type': 'day',

            'date': target_date.isoformat(),

            'all_day_events': [self._event_to_dict(e) for e in all_day_events],

            'timed_events': event_layout,

            'holidays': day_holidays,

            'time_slots': time_slots,

            'is_today': target_date == date.today(),

            'theme': config.theme.value

        }

    

    def _render_week_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                         config: ViewConfig) -> Dict[str, Any]:

        """Render a week view with all seven days."""

        

        # Calculate week boundaries

        week_start = config.start_date

        week_days = [week_start + timedelta(days=i) for i in range(7)]

        

        # Create calendar cells for each day

        week_cells = []

        for day_date in week_days:

            day_events = [e for e in events if e.start_time.date() <= day_date <= e.end_time.date()]

            day_holidays = [h.name for h in holidays if h.date == day_date]

            

            cell = CalendarCell(

                date=day_date,

                events=day_events,

                holidays=day_holidays,

                is_today=day_date == date.today(),

                is_weekend=day_date.weekday() >= 5,

                is_other_month=False,

                available_slots=self._calculate_available_slots(day_events, day_date)

            )

            week_cells.append(cell)

        

        # Calculate multi-day event spans

        multi_day_events = self._calculate_multi_day_spans(events, week_start, week_start + timedelta(days=6))

        

        return {

            'view_type': 'week',

            'week_start': week_start.isoformat(),

            'week_end': (week_start + timedelta(days=6)).isoformat(),

            'days': [self._cell_to_dict(cell) for cell in week_cells],

            'multi_day_events': multi_day_events,

            'theme': config.theme.value

        }

    

    def _render_month_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                          config: ViewConfig) -> Dict[str, Any]:

        """Render a month view with calendar grid."""

        

        target_month = config.start_date.month

        target_year = config.start_date.year

        

        # Calculate month boundaries

        first_day = date(target_year, target_month, 1)

        last_day = date(target_year, target_month, monthrange(target_year, target_month)[1])

        

        # Calculate calendar grid (including previous/next month days)

        grid_start = first_day - timedelta(days=first_day.weekday())

        grid_end = last_day + timedelta(days=6 - last_day.weekday())

        

        # Generate calendar grid

        calendar_grid = []

        current_date = grid_start

        

        while current_date <= grid_end:

            week_row = []

            for _ in range(7):

                day_events = [e for e in events if e.start_time.date() <= current_date <= e.end_time.date()]

                day_holidays = [h.name for h in holidays if h.date == current_date]

                

                # Limit events shown in month view

                display_events = sorted(day_events, key=lambda x: x.display_priority, reverse=True)[:3]

                

                cell = CalendarCell(

                    date=current_date,

                    events=display_events,

                    holidays=day_holidays,

                    is_today=current_date == date.today(),

                    is_weekend=current_date.weekday() >= 5,

                    is_other_month=current_date.month != target_month,

                    available_slots=[]  # Not calculated for month view

                )

                week_row.append(cell)

                current_date += timedelta(days=1)

            

            calendar_grid.append(week_row)

        

        return {

            'view_type': 'month',

            'month': target_month,

            'year': target_year,

            'month_name': first_day.strftime('%B'),

            'calendar_grid': [[self._cell_to_dict(cell) for cell in week] for week in calendar_grid],

            'total_events': len(events),

            'theme': config.theme.value

        }

    

    def _render_year_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                         config: ViewConfig) -> Dict[str, Any]:

        """Render a year view with mini month calendars."""

        

        target_year = config.start_date.year

        months_data = []

        

        for month in range(1, 13):

            month_start = date(target_year, month, 1)

            month_end = date(target_year, month, monthrange(target_year, month)[1])

            

            # Get events and holidays for this month

            month_events = [e for e in events if 

                          e.start_time.date() <= month_end and e.end_time.date() >= month_start]

            month_holidays = [h for h in holidays if 

                            h.date.month == month and h.date.year == target_year]

            

            # Create mini month configuration

            month_config = ViewConfig(

                mode=ViewMode.MONTH,

                start_date=month_start,

                end_date=month_end,

                theme=config.theme

            )

            

            # Render mini month

            mini_month = self._render_mini_month(month_events, month_holidays, month_config)

            months_data.append(mini_month)

        

        return {

            'view_type': 'year',

            'year': target_year,

            'months': months_data,

            'total_events': len(events),

            'total_holidays': len(holidays),

            'theme': config.theme.value

        }

    

    def _render_agenda_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                           config: ViewConfig) -> Dict[str, Any]:

        """Render an agenda/list view of events."""

        

        # Group events by date

        events_by_date = {}

        current_date = config.start_date

        

        while current_date <= config.end_date:

            day_events = [e for e in events if e.start_time.date() <= current_date <= e.end_time.date()]

            day_holidays = [h.name for h in holidays if h.date == current_date]

            

            if day_events or day_holidays:

                events_by_date[current_date.isoformat()] = {

                    'date': current_date.isoformat(),

                    'day_name': current_date.strftime('%A'),

                    'events': sorted([self._event_to_dict(e) for e in day_events], 

                                   key=lambda x: x['start_time']),

                    'holidays': day_holidays,

                    'is_today': current_date == date.today()

                }

            

            current_date += timedelta(days=1)

        

        return {

            'view_type': 'agenda',

            'start_date': config.start_date.isoformat(),

            'end_date': config.end_date.isoformat(),

            'events_by_date': events_by_date,

            'theme': config.theme.value

        }

    

    def _render_mini_month(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo], 

                          config: ViewConfig) -> Dict[str, Any]:

        """Render a mini month calendar for year view."""

        

        target_month = config.start_date.month

        target_year = config.start_date.year

        

        # Simplified month rendering for year view

        days_with_events = set()

        days_with_holidays = set()

        

        for event in events:

            event_start = event.start_time.date()

            event_end = event.end_time.date()

            current = max(event_start, config.start_date)

            end = min(event_end, config.end_date)

            

            while current <= end:

                if current.month == target_month:

                    days_with_events.add(current.day)

                current += timedelta(days=1)

        

        for holiday in holidays:

            if holiday.date.month == target_month and holiday.date.year == target_year:

                days_with_holidays.add(holiday.date.day)

        

        return {

            'month': target_month,

            'month_name': config.start_date.strftime('%b'),

            'year': target_year,

            'days_with_events': list(days_with_events),

            'days_with_holidays': list(days_with_holidays),

            'total_events': len(events)

        }

    

    def _generate_time_slots(self, start_time: datetime, end_time: datetime, 

                           interval_minutes: int) -> List[str]:

        """Generate time slot labels for day view."""

        slots = []

        current = start_time

        

        while current <= end_time:

            slots.append(current.strftime('%H:%M'))

            current += timedelta(minutes=interval_minutes)

        

        return slots

    

    def _calculate_available_slots(self, events: List[EventDisplayInfo], 

                                 target_date: date) -> List[Tuple[time, time]]:

        """Calculate available time slots for a given date."""

        # Define business hours

        business_start = time(9, 0)

        business_end = time(17, 0)

        

        # Get timed events for the date

        timed_events = [e for e in events if not e.is_all_day and 

                       e.start_time.date() <= target_date <= e.end_time.date()]

        

        # Sort events by start time

        timed_events.sort(key=lambda x: x.start_time.time())

        

        available_slots = []

        current_time = business_start

        

        for event in timed_events:

            event_start = event.start_time.time()

            event_end = event.end_time.time()

            

            # Add slot before event if there's a gap

            if current_time < event_start:

                available_slots.append((current_time, event_start))

            

            # Move current time to after the event

            current_time = max(current_time, event_end)

        

        # Add final slot if there's time left

        if current_time < business_end:

            available_slots.append((current_time, business_end))

        

        return available_slots

    

    def _calculate_multi_day_spans(self, events: List[EventDisplayInfo], 

                                 start_date: date, end_date: date) -> List[Dict[str, Any]]:

        """Calculate how multi-day events span across the view period."""

        multi_day_events = []

        

        for event in events:

            event_start = max(event.start_time.date(), start_date)

            event_end = min(event.end_time.date(), end_date)

            

            if event_start <= event_end and (event.end_time.date() - event.start_time.date()).days > 0:

                span_days = (event_end - event_start).days + 1

                start_day_offset = (event_start - start_date).days

                

                multi_day_events.append({

                    'event_id': event.event_id,

                    'title': event.title,

                    'color': event.color,

                    'start_day_offset': start_day_offset,

                    'span_days': span_days,

                    'is_all_day': event.is_all_day

                })

        

        return multi_day_events

    

    def _get_default_color(self, event_type: EventType) -> str:

        """Get default color for an event type."""

        color_map = {

            EventType.MEETING: '#3498db',

            EventType.APPOINTMENT: '#e74c3c',

            EventType.PERSONAL: '#2ecc71',

            EventType.WORK: '#9b59b6',

            EventType.TASK: '#f39c12',

            EventType.REMINDER: '#95a5a6'

        }

        return color_map.get(event_type, '#34495e')

    

    def _calculate_display_priority(self, event: Event) -> int:

        """Calculate display priority for event ordering."""

        priority = 0

        

        # Higher priority for all-day events

        if event.all_day:

            priority += 100

        

        # Higher priority for longer events

        duration_hours = (event.calculated_end_time - event.start_time).total_seconds() / 3600

        priority += min(duration_hours * 10, 50)

        

        # Higher priority for certain event types

        type_priorities = {

            EventType.MEETING: 20,

            EventType.APPOINTMENT: 15,

            EventType.WORK: 10,

            EventType.PERSONAL: 5,

            EventType.TASK: 3,

            EventType.REMINDER: 1

        }

        priority += type_priorities.get(event.event_type, 0)

        

        return int(priority)

    

    def _event_to_dict(self, event: EventDisplayInfo) -> Dict[str, Any]:

        """Convert EventDisplayInfo to dictionary for JSON serialization."""

        return {

            'id': event.event_id,

            'title': event.title,

            'start_time': event.start_time.isoformat(),

            'end_time': event.end_time.isoformat(),

            'color': event.color,

            'event_type': event.event_type,

            'location': event.location,

            'is_all_day': event.is_all_day,

            'is_recurring': event.is_recurring

        }

    

    def _cell_to_dict(self, cell: CalendarCell) -> Dict[str, Any]:

        """Convert CalendarCell to dictionary for JSON serialization."""

        return {

            'date': cell.date.isoformat(),

            'events': [self._event_to_dict(e) for e in cell.events],

            'holidays': cell.holidays,

            'is_today': cell.is_today,

            'is_weekend': cell.is_weekend,

            'is_other_month': cell.is_other_month,

            'event_count': len(cell.events)

        }

    

    def _initialize_color_palette(self) -> Dict[str, List[str]]:

        """Initialize color palettes for different themes."""

        return {

            'light': [

                '#3498db', '#e74c3c', '#2ecc71', '#f39c12', '#9b59b6',

                '#1abc9c', '#e67e22', '#34495e', '#f1c40f', '#e91e63'

            ],

            'dark': [

                '#5dade2', '#ec7063', '#58d68d', '#f7dc6f', '#bb8fce',

                '#76d7c4', '#f8c471', '#85929e', '#f4d03f', '#f1948a'

            ]

        }


class LayoutCalculator:

    def calculate_day_layout(self, events: List[EventDisplayInfo], 

                           time_slots: List[str]) -> List[Dict[str, Any]]:

        """Calculate layout for events in day view to avoid overlaps."""

        

        # Sort events by start time

        sorted_events = sorted(events, key=lambda x: x.start_time)

        

        # Calculate event positions and widths

        event_layout = []

        columns = []  # Track occupied columns

        

        for event in sorted_events:

            # Find available column

            column = 0

            while column < len(columns):

                if columns[column] <= event.start_time:

                    break

                column += 1

            

            # Add new column if needed

            if column >= len(columns):

                columns.append(event.end_time)

            else:

                columns[column] = event.end_time

            

            # Calculate position and size

            start_minutes = event.start_time.hour * 60 + event.start_time.minute

            duration_minutes = (event.end_time - event.start_time).total_seconds() / 60

            

            event_layout.append({

                'event': self._event_to_dict(event),

                'column': column,

                'total_columns': len(columns),

                'top_position': start_minutes,

                'height': duration_minutes,

                'width_percent': 100 / len(columns)

            })

        

        return event_layout

    

    def _event_to_dict(self, event: EventDisplayInfo) -> Dict[str, Any]:

        """Convert EventDisplayInfo to dictionary."""

        return {

            'id': event.event_id,

            'title': event.title,

            'start_time': event.start_time.isoformat(),

            'end_time': event.end_time.isoformat(),

            'color': event.color,

            'location': event.location

        }


class DisplayConflictDetector:

    def resolve_display_conflicts(self, events: List[EventDisplayInfo]) -> List[EventDisplayInfo]:

        """Resolve display conflicts between overlapping events."""

        

        # Sort events by priority

        events.sort(key=lambda x: x.display_priority, reverse=True)

        

        # Mark conflicts

        for i, event1 in enumerate(events):

            for j, event2 in enumerate(events[i+1:], i+1):

                if self._events_overlap(event1, event2):

                    event2.conflict_level += 1

        

        return events

    

    def _events_overlap(self, event1: EventDisplayInfo, event2: EventDisplayInfo) -> bool:

        """Check if two events overlap in time."""

        return (event1.start_time < event2.end_time and 

                event1.end_time > event2.start_time)



This calendar display and visualization system provides comprehensive support for rendering calendar data across multiple view modes while maintaining visual consistency and usability. The CalendarRenderer handles the complex task of transforming calendar data into appropriate visual representations, while the LayoutCalculator ensures that overlapping events are displayed clearly without conflicts. The system supports responsive design principles and user customization options, making it suitable for deployment across different devices and user preferences.


User Interface and Experience Design


The user interface and experience design for an LLM-based calendar application requires careful balance between the power of natural language interaction and the familiarity of traditional calendar interfaces. Users need to feel confident that they can accomplish their tasks efficiently while discovering the enhanced capabilities that natural language processing provides. The interface must support both conversational interactions and direct manipulation, allowing users to choose their preferred interaction method based on context and personal preference.


The conversational interface presents unique design challenges because users must understand what types of commands the system can process and receive clear feedback about the results of their requests. Unlike traditional graphical interfaces where available actions are visible through buttons and menus, natural language interfaces require users to discover capabilities through experimentation or guidance. The system must provide helpful prompts, examples, and error recovery mechanisms to support user learning and confidence.


Visual feedback and confirmation mechanisms become particularly important in a natural language calendar system. When a user says "Schedule a meeting with Sarah next Tuesday," the system must clearly show what was understood and created, allowing for easy correction if the interpretation was incorrect. The interface must also handle ambiguous requests gracefully, presenting clarification options rather than making assumptions that might lead to incorrect calendar entries.


Here is a comprehensive implementation of the user interface and experience design system. This example demonstrates how natural language interactions are integrated with traditional calendar interfaces, including conversation management, visual feedback systems, and responsive design patterns. The implementation shows how users can seamlessly switch between conversational and direct manipulation modes.



from datetime import datetime, date, timedelta

from typing import List, Dict, Any, Optional, Callable

from dataclasses import dataclass, field

from enum import Enum

import json

import asyncio


class InteractionMode(Enum):

    CONVERSATION = "conversation"

    DIRECT_MANIPULATION = "direct_manipulation"

    HYBRID = "hybrid"


class FeedbackType(Enum):

    SUCCESS = "success"

    ERROR = "error"

    WARNING = "warning"

    INFO = "info"

    CONFIRMATION = "confirmation"


@dataclass

class UserInterfaceState:

    current_view: ViewMode

    selected_date: date

    interaction_mode: InteractionMode

    conversation_active: bool

    pending_confirmations: List[str] = field(default_factory=list)

    user_preferences: Dict[str, Any] = field(default_factory=dict)

    accessibility_settings: Dict[str, Any] = field(default_factory=dict)


@dataclass

class ConversationTurn:

    user_input: str

    system_response: str

    timestamp: datetime

    intent: Optional[Dict[str, Any]] = None

    result: Optional[Dict[str, Any]] = None

    requires_confirmation: bool = False


@dataclass

class UIFeedback:

    message: str

    feedback_type: FeedbackType

    duration_ms: int = 5000

    actions: List[Dict[str, Any]] = field(default_factory=list)

    data: Optional[Dict[str, Any]] = None


class ConversationManager:

    def __init__(self, llm_service, calendar_engine):

        self.llm_service = llm_service

        self.calendar_engine = calendar_engine

        self.conversation_history = []

        self.context_window = 10  # Keep last 10 turns for context

        

    async def process_user_input(self, user_input: str, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Process user input and return response with UI updates."""

        

        # Create conversation turn

        turn = ConversationTurn(

            user_input=user_input,

            system_response="",

            timestamp=datetime.now()

        )

        

        try:

            # Parse intent with conversation context

            intent = await self.llm_service.parse_intent(

                user_input, 

                self._build_conversation_context()

            )

            turn.intent = intent

            

            # Handle special conversation commands

            if self._is_conversation_command(user_input):

                response = await self._handle_conversation_command(user_input, ui_state)

                turn.system_response = response['message']

                return response

            

            # Execute calendar command

            result = await self.calendar_engine.execute_command(intent)

            turn.result = result.__dict__

            

            # Generate natural language response

            response = await self._generate_response(result, intent, ui_state)

            turn.system_response = response['message']

            

            # Add to conversation history

            self.conversation_history.append(turn)

            self._trim_conversation_history()

            

            return response

            

        except Exception as e:

            error_response = {

                'message': f"I'm sorry, I encountered an error: {str(e)}",

                'feedback': UIFeedback(

                    message="Something went wrong. Please try again.",

                    feedback_type=FeedbackType.ERROR

                ),

                'suggestions': [

                    "Try rephrasing your request",

                    "Use simpler language",

                    "Check if all required information is provided"

                ]

            }

            turn.system_response = error_response['message']

            self.conversation_history.append(turn)

            return error_response

    

    async def _generate_response(self, result, intent: Dict[str, Any], 

                               ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Generate comprehensive response with UI updates."""

        

        response = {

            'message': '',

            'feedback': None,

            'ui_updates': {},

            'suggestions': [],

            'requires_confirmation': False

        }

        

        if result.success:

            response.update(await self._handle_successful_result(result, intent, ui_state))

        else:

            response.update(await self._handle_failed_result(result, intent, ui_state))

        

        # Add contextual suggestions

        response['suggestions'] = await self._generate_contextual_suggestions(intent, ui_state)

        

        return response

    

    async def _handle_successful_result(self, result, intent: Dict[str, Any], 

                                      ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Handle successful calendar operations."""

        

        action = intent.get('action')

        

        if action == 'create':

            return await self._handle_create_success(result, intent, ui_state)

        elif action == 'update':

            return await self._handle_update_success(result, intent, ui_state)

        elif action == 'delete':

            return await self._handle_delete_success(result, intent, ui_state)

        elif action == 'query':

            return await self._handle_query_success(result, intent, ui_state)

        elif action == 'find_slots':

            return await self._handle_find_slots_success(result, intent, ui_state)

        else:

            return {

                'message': "Operation completed successfully.",

                'feedback': UIFeedback(

                    message="Done!",

                    feedback_type=FeedbackType.SUCCESS

                )

            }

    

    async def _handle_create_success(self, result, intent: Dict[str, Any], 

                                   ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Handle successful event creation."""

        

        event = result.data

        event_time = event.start_time.strftime('%A, %B %d at %I:%M %p')

        

        message = f"Perfect! I've scheduled '{event.title}' for {event_time}"

        if event.location:

            message += f" at {event.location}"

        message += "."

        

        # Prepare UI updates

        ui_updates = {

            'highlight_event': str(event.id),

            'navigate_to_date': event.start_time.date().isoformat(),

            'refresh_view': True

        }

        

        # Create feedback with action options

        feedback = UIFeedback(

            message="Event created successfully!",

            feedback_type=FeedbackType.SUCCESS,

            actions=[

                {'label': 'View Event', 'action': 'view_event', 'event_id': str(event.id)},

                {'label': 'Add Reminder', 'action': 'add_reminder', 'event_id': str(event.id)},

                {'label': 'Invite Others', 'action': 'invite_participants', 'event_id': str(event.id)}

            ]

        )

        

        return {

            'message': message,

            'feedback': feedback,

            'ui_updates': ui_updates

        }

    

    async def _handle_query_success(self, result, intent: Dict[str, Any], 

                                  ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Handle successful event queries."""

        

        events = result.data

        

        if not events:

            message = "You don't have any events scheduled for that time."

            feedback = UIFeedback(

                message="No events found",

                feedback_type=FeedbackType.INFO

            )

        else:

            event_count = len(events)

            message = f"You have {event_count} event{'s' if event_count != 1 else ''}: "

            

            event_summaries = []

            for event in events[:3]:  # Show first 3 events

                time_str = event.start_time.strftime('%I:%M %p')

                event_summaries.append(f"'{event.title}' at {time_str}")

            

            message += ", ".join(event_summaries)

            

            if event_count > 3:

                message += f" and {event_count - 3} more."

            

            feedback = UIFeedback(

                message=f"Found {event_count} events",

                feedback_type=FeedbackType.SUCCESS,

                actions=[

                    {'label': 'View All', 'action': 'show_agenda_view'},

                    {'label': 'Export List', 'action': 'export_events'}

                ]

            )

        

        ui_updates = {

            'show_event_list': [self._event_to_summary(e) for e in events],

            'refresh_view': True

        }

        

        return {

            'message': message,

            'feedback': feedback,

            'ui_updates': ui_updates

        }

    

    async def _handle_find_slots_success(self, result, intent: Dict[str, Any], 

                                       ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Handle successful free slot finding."""

        

        free_slots = result.data

        

        if not free_slots:

            message = "I couldn't find any available time slots in the specified period. Would you like me to check a different time range?"

            feedback = UIFeedback(

                message="No available slots found",

                feedback_type=FeedbackType.WARNING,

                actions=[

                    {'label': 'Try Different Dates', 'action': 'suggest_alternative_dates'},

                    {'label': 'Show Conflicts', 'action': 'show_conflicting_events'}

                ]

            )

        else:

            slot_count = len(free_slots)

            message = f"I found {slot_count} available time slot{'s' if slot_count != 1 else ''}. "

            

            # Show first few slots

            slot_summaries = []

            for slot in free_slots[:3]:

                start_time = slot['start'].strftime('%A %I:%M %p')

                slot_summaries.append(start_time)

            

            message += "Here are some options: " + ", ".join(slot_summaries)

            

            if slot_count > 3:

                message += f" and {slot_count - 3} more."

            

            feedback = UIFeedback(

                message=f"Found {slot_count} available slots",

                feedback_type=FeedbackType.SUCCESS,

                actions=[

                    {'label': 'Schedule Meeting', 'action': 'create_event_in_slot'},

                    {'label': 'View All Slots', 'action': 'show_all_slots'}

                ]

            )

        

        ui_updates = {

            'highlight_free_slots': free_slots,

            'show_availability_overlay': True

        }

        

        return {

            'message': message,

            'feedback': feedback,

            'ui_updates': ui_updates

        }

    

    def _is_conversation_command(self, user_input: str) -> bool:

        """Check if input is a conversation management command."""

        conversation_commands = [

            'help', 'what can you do', 'how do i', 'explain', 'tutorial',

            'clear', 'reset', 'start over', 'nevermind', 'cancel'

        ]

        return any(cmd in user_input.lower() for cmd in conversation_commands)

    

    async def _handle_conversation_command(self, user_input: str, 

                                         ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Handle conversation management commands."""

        

        lower_input = user_input.lower()

        

        if any(word in lower_input for word in ['help', 'what can you do']):

            return await self._provide_help(ui_state)

        elif any(word in lower_input for word in ['clear', 'reset', 'start over']):

            return await self._reset_conversation()

        elif any(word in lower_input for word in ['nevermind', 'cancel']):

            return await self._cancel_operation(ui_state)

        else:

            return {

                'message': "I'm not sure how to help with that. Try asking 'What can you do?' for available commands.",

                'feedback': UIFeedback(

                    message="Unknown command",

                    feedback_type=FeedbackType.INFO

                )

            }

    

    async def _provide_help(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Provide help information and examples."""

        

        help_message = """I can help you manage your calendar using natural language. Here are some things you can ask me:


📅 **Creating Events:**

- "Schedule a meeting with John tomorrow at 2 PM"

- "Book a dentist appointment next Friday morning"

- "Set up a weekly team meeting every Tuesday at 10 AM"


🔍 **Finding Information:**

- "What do I have scheduled for today?"

- "Show me next week's meetings"

- "When am I free this afternoon?"


✏️ **Modifying Events:**

- "Move my 3 PM meeting to 4 PM"

- "Cancel tomorrow's lunch appointment"

- "Add Sarah to the project meeting"


Just speak naturally - I'll understand what you want to do!"""

        

        examples = [

            "Schedule a meeting with the marketing team next Monday at 10 AM",

            "What meetings do I have this week?",

            "Find me a free hour tomorrow afternoon",

            "Cancel my 2 PM appointment",

            "Move Friday's meeting to next week"

        ]

        

        return {

            'message': help_message,

            'feedback': UIFeedback(

                message="Here's what I can help you with",

                feedback_type=FeedbackType.INFO,

                duration_ms=10000

            ),

            'ui_updates': {

                'show_help_panel': True,

                'example_commands': examples

            }

        }

    

    def _build_conversation_context(self) -> Dict[str, Any]:

        """Build context from recent conversation history."""

        

        context = {

            'recent_turns': [],

            'mentioned_entities': set(),

            'pending_operations': []

        }

        

        # Get recent conversation turns

        recent_turns = self.conversation_history[-self.context_window:]

        for turn in recent_turns:

            context['recent_turns'].append({

                'user_input': turn.user_input,

                'system_response': turn.system_response,

                'intent': turn.intent

            })

            

            # Extract mentioned entities

            if turn.intent and 'parameters' in turn.intent:

                entities = turn.intent['parameters'].get('entities', {})

                for entity_type, entity_list in entities.items():

                    context['mentioned_entities'].update(entity_list)

        

        return context

    

    def _event_to_summary(self, event) -> Dict[str, Any]:

        """Convert event to summary format for UI display."""

        return {

            'id': str(event.id),

            'title': event.title,

            'start_time': event.start_time.isoformat(),

            'end_time': event.calculated_end_time.isoformat(),

            'location': event.location,

            'type': event.event_type.value

        }

    

    def _trim_conversation_history(self):

        """Keep conversation history within reasonable bounds."""

        max_history = 50

        if len(self.conversation_history) > max_history:

            self.conversation_history = self.conversation_history[-max_history:]

    

    async def _generate_contextual_suggestions(self, intent: Dict[str, Any], 

                                             ui_state: UserInterfaceState) -> List[str]:

        """Generate contextual suggestions based on current state."""

        

        suggestions = []

        action = intent.get('action')

        

        if action == 'create':

            suggestions.extend([

                "Add a reminder to this event",

                "Invite participants to the meeting",

                "Set up a recurring schedule"

            ])

        elif action == 'query':

            suggestions.extend([

                "Find available time slots",

                "Create a new event",

                "Export this schedule"

            ])

        elif action == 'find_slots':

            suggestions.extend([

                "Schedule a meeting in one of these slots",

                "Check availability for different dates",

                "See what's causing conflicts"

            ])

        

        # Add general suggestions based on current view

        if ui_state.current_view == ViewMode.DAY:

            suggestions.append("Show me the week view")

        elif ui_state.current_view == ViewMode.WEEK:

            suggestions.append("Focus on today's schedule")

        

        return suggestions[:3]  # Limit to 3 suggestions


class UIComponentManager:

    def __init__(self):

        self.active_components = {}

        self.component_state = {}

        

    def render_conversation_interface(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Render the conversation interface component."""

        

        return {

            'type': 'conversation_interface',

            'properties': {

                'placeholder': self._get_input_placeholder(ui_state),

                'suggestions': self._get_quick_suggestions(ui_state),

                'voice_enabled': ui_state.user_preferences.get('voice_input', False),

                'conversation_active': ui_state.conversation_active,

                'accessibility': {

                    'screen_reader_enabled': ui_state.accessibility_settings.get('screen_reader', False),

                    'high_contrast': ui_state.accessibility_settings.get('high_contrast', False),

                    'large_text': ui_state.accessibility_settings.get('large_text', False)

                }

            }

        }

    

    def render_calendar_view(self, calendar_data: Dict[str, Any], 

                           ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Render the calendar view component."""

        

        return {

            'type': 'calendar_view',

            'properties': {

                'view_mode': ui_state.current_view.value,

                'selected_date': ui_state.selected_date.isoformat(),

                'calendar_data': calendar_data,

                'interaction_mode': ui_state.interaction_mode.value,

                'theme': ui_state.user_preferences.get('theme', 'light'),

                'animations_enabled': ui_state.user_preferences.get('animations', True)

            }

        }

    

    def render_feedback_system(self, feedback: UIFeedback) -> Dict[str, Any]:

        """Render feedback notifications and confirmations."""

        

        return {

            'type': 'feedback_notification',

            'properties': {

                'message': feedback.message,

                'type': feedback.feedback_type.value,

                'duration': feedback.duration_ms,

                'actions': feedback.actions,

                'dismissible': True,

                'position': 'top-right'

            }

        }

    

    def _get_input_placeholder(self, ui_state: UserInterfaceState) -> str:

        """Get contextual placeholder text for input field."""

        

        if ui_state.current_view == ViewMode.DAY:

            return "Try: 'Schedule a meeting at 2 PM' or 'What do I have today?'"

        elif ui_state.current_view == ViewMode.WEEK:

            return "Try: 'Show me free time this week' or 'Move Monday's meeting'"

        elif ui_state.current_view == ViewMode.MONTH:

            return "Try: 'What's happening next Friday?' or 'Block out vacation time'"

        else:

            return "Ask me about your calendar - I'm here to help!"

    

    def _get_quick_suggestions(self, ui_state: UserInterfaceState) -> List[str]:

        """Get quick suggestion buttons based on current context."""

        

        base_suggestions = [

            "What's on my schedule today?",

            "Find me a free hour",

            "Schedule a meeting"

        ]

        

        # Add contextual suggestions based on current view

        if ui_state.current_view == ViewMode.DAY:

            base_suggestions.append("Show me tomorrow")

        elif ui_state.current_view == ViewMode.WEEK:

            base_suggestions.append("Go to next week")

        

        return base_suggestions


class AccessibilityManager:

    def __init__(self):

        self.screen_reader_support = True

        self.keyboard_navigation = True

        

    def apply_accessibility_settings(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Apply accessibility settings to UI components."""

        

        settings = ui_state.accessibility_settings

        

        return {

            'aria_labels': self._generate_aria_labels(ui_state),

            'keyboard_shortcuts': self._get_keyboard_shortcuts(),

            'focus_management': self._get_focus_management_rules(),

            'screen_reader_announcements': self._get_screen_reader_text(ui_state),

            'high_contrast_mode': settings.get('high_contrast', False),

            'reduced_motion': settings.get('reduced_motion', False),

            'large_text_mode': settings.get('large_text', False)

        }

    

    def _generate_aria_labels(self, ui_state: UserInterfaceState) -> Dict[str, str]:

        """Generate ARIA labels for UI elements."""

        

        return {

            'conversation_input': 'Enter calendar command or question',

            'calendar_grid': f'Calendar view for {ui_state.selected_date.strftime("%B %Y")}',

            'event_list': 'List of scheduled events',

            'navigation_buttons': 'Calendar navigation controls',

            'view_mode_selector': 'Change calendar view mode'

        }

    

    def _get_keyboard_shortcuts(self) -> Dict[str, str]:

        """Define keyboard shortcuts for accessibility."""

        

        return {

            'ctrl+n': 'Create new event',

            'ctrl+f': 'Find events',

            'ctrl+t': 'Go to today',

            'ctrl+d': 'Day view',

            'ctrl+w': 'Week view',

            'ctrl+m': 'Month view',

            'ctrl+y': 'Year view',

            'escape': 'Cancel current operation',

            'enter': 'Confirm action',

            'tab': 'Navigate to next element',

            'shift+tab': 'Navigate to previous element'

        }

    

    def _get_focus_management_rules(self) -> Dict[str, Any]:

        """Define focus management rules for keyboard navigation."""

        

        return {

            'initial_focus': 'conversation_input',

            'focus_trap_modals': True,

            'focus_visible_indicators': True,

            'skip_links': [

                {'target': 'main_content', 'label': 'Skip to main content'},

                {'target': 'calendar_view', 'label': 'Skip to calendar'},

                {'target': 'conversation_input', 'label': 'Skip to conversation input'}

            ]

        }

    

    def _get_screen_reader_text(self, ui_state: UserInterfaceState) -> Dict[str, str]:

        """Generate screen reader announcements."""

        

        return {

            'view_changed': f"Calendar view changed to {ui_state.current_view.value}",

            'date_changed': f"Selected date changed to {ui_state.selected_date.strftime('%A, %B %d, %Y')}",

            'event_created': "New event created successfully",

            'event_updated': "Event updated successfully",

            'event_deleted': "Event deleted successfully",

            'search_results': "Search results updated"

        }


class ResponsiveDesignManager:

    def __init__(self):

        self.breakpoints = {

            'mobile': 768,

            'tablet': 1024,

            'desktop': 1440

        }

    

    def get_responsive_layout(self, screen_width: int, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Get responsive layout configuration based on screen size."""

        

        if screen_width < self.breakpoints['mobile']:

            return self._get_mobile_layout(ui_state)

        elif screen_width < self.breakpoints['tablet']:

            return self._get_tablet_layout(ui_state)

        else:

            return self._get_desktop_layout(ui_state)

    

    def _get_mobile_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Mobile-optimized layout configuration."""

        

        return {

            'layout_type': 'mobile',

            'conversation_interface': {

                'position': 'bottom',

                'full_width': True,

                'voice_button_prominent': True

            },

            'calendar_view': {

                'default_view': ViewMode.DAY.value,

                'compact_events': True,

                'swipe_navigation': True

            },

            'navigation': {

                'type': 'bottom_tabs',

                'items': ['Day', 'Week', 'Month', 'Chat']

            }

        }

    

    def _get_tablet_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Tablet-optimized layout configuration."""

        

        return {

            'layout_type': 'tablet',

            'conversation_interface': {

                'position': 'sidebar',

                'width': '30%',

                'collapsible': True

            },

            'calendar_view': {

                'default_view': ViewMode.WEEK.value,

                'show_mini_calendar': True,

                'touch_optimized': True

            },

            'navigation': {

                'type': 'top_bar',

                'show_view_switcher': True

            }

        }

    

    def _get_desktop_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:

        """Desktop-optimized layout configuration."""

        

        return {

            'layout_type': 'desktop',

            'conversation_interface': {

                'position': 'floating',

                'width': '400px',

                'resizable': True,

                'keyboard_shortcuts': True

            },

            'calendar_view': {

                'default_view': ViewMode.MONTH.value,

                'show_sidebar': True,

                'multi_calendar_support': True

            },

            'navigation': {

                'type': 'full_menu',

                'show_all_options': True,

                'keyboard_navigation': True

            }

        }



This user interface and experience design system provides a comprehensive framework for creating an intuitive and accessible LLM-based calendar application. The ConversationManager handles the complex task of maintaining conversational context while providing clear feedback and suggestions. The UIComponentManager ensures consistent rendering across different interface elements, while the AccessibilityManager guarantees that the application is usable by people with diverse abilities. The ResponsiveDesignManager adapts the interface to different screen sizes and interaction patterns, ensuring a seamless experience across all devices.


Advanced Features Implementation


The advanced features of an LLM-based calendar application extend beyond basic scheduling to provide intelligent automation, predictive capabilities, and seamless integration with external systems. These features leverage the natural language processing capabilities to offer sophisticated functionality that would be difficult to implement in traditional calendar applications. The system can learn from user patterns, provide proactive suggestions, and automate routine scheduling tasks while maintaining user control and transparency.


Intelligent scheduling represents one of the most valuable advanced features, where the system can automatically suggest optimal meeting times based on participant availability, preferences, and historical patterns. The system analyzes factors such as time zones, working hours, meeting frequency, and travel time between locations to propose the best possible scheduling options. This feature requires sophisticated algorithms that balance multiple constraints while providing explanations for scheduling decisions.


Integration capabilities allow the calendar to connect with external systems such as email, video conferencing platforms, project management tools, and travel booking services. These integrations enable automatic event creation from email invitations, seamless video conference setup, and intelligent travel time calculations. The natural language interface makes these integrations more powerful by allowing users to request complex operations that span multiple systems through simple conversational commands.


Here is a comprehensive implementation of advanced features that demonstrates intelligent scheduling, external integrations, and predictive capabilities. This example shows how the system can automatically optimize schedules, integrate with external services, and provide proactive assistance to users. The implementation includes machine learning components for pattern recognition and decision-making algorithms for complex scheduling scenarios.



import asyncio
from datetime import datetime, timedelta, time
from typing import List, Dict, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import json
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class SchedulingStrategy(Enum):
    OPTIMIZE_FOR_FOCUS = "optimize_for_focus"
    MINIMIZE_CONFLICTS = "minimize_conflicts"
    BALANCE_WORKLOAD = "balance_workload"
    RESPECT_PREFERENCES = "respect_preferences"
    MINIMIZE_TRAVEL = "minimize_travel"
class IntegrationType(Enum):
    EMAIL = "email"
    VIDEO_CONFERENCE = "video_conference"
    PROJECT_MANAGEMENT = "project_management"
    TRAVEL_BOOKING = "travel_booking"
    WEATHER_SERVICE = "weather_service"
    CONTACT_MANAGEMENT = "contact_management"
@dataclass
class UserPreferences:
    preferred_meeting_times: List[Tuple[time, time]]
    maximum_daily_meetings: int
    minimum_break_between_meetings: int  # minutes
    preferred_meeting_duration: int  # minutes
    work_hours: Tuple[time, time]
    time_zone: str
    focus_time_blocks: List[Tuple[time, time]]
    travel_buffer_time: int  # minutes
    notification_preferences: Dict[str, Any]
@dataclass
class SchedulingConstraint:
    constraint_type: str
    priority: int  # 1-10, 10 being highest
    description: str
    parameters: Dict[str, Any]
@dataclass
class SchedulingSuggestion:
    suggested_time: datetime
    confidence_score: float
    reasoning: str
    alternative_times: List[datetime]
    potential_conflicts: List[str]
    optimization_factors: Dict[str, float]
class IntelligentScheduler:
    def __init__(self, calendar_engine, user_preferences: UserPreferences):
        self.calendar_engine = calendar_engine
        self.user_preferences = user_preferences
        self.pattern_analyzer = SchedulingPatternAnalyzer()
        self.constraint_solver = ConstraintSolver()
        self.optimization_engine = SchedulingOptimizer()
        
    async def suggest_optimal_meeting_time(self, meeting_requirements: Dict[str, Any], 
                                         participants: List[str] = None) -> SchedulingSuggestion:
        """Suggest optimal meeting time based on multiple factors."""
        
        # Analyze current schedule patterns
        patterns = await self.pattern_analyzer.analyze_user_patterns(
            meeting_requirements.get('user_id')
        )
        
        # Get participant availability if provided
        participant_availability = {}
        if participants:
            for participant in participants:
                availability = await self._get_participant_availability(participant)
                participant_availability[participant] = availability
        
        # Define scheduling constraints
        constraints = self._build_scheduling_constraints(
            meeting_requirements, participant_availability, patterns
        )
        
        # Find candidate time slots
        candidate_slots = await self._find_candidate_time_slots(
            meeting_requirements, constraints
        )
        
        # Optimize and rank suggestions
        optimized_suggestions = await self.optimization_engine.optimize_suggestions(
            candidate_slots, constraints, patterns
        )
        
        if optimized_suggestions:
            best_suggestion = optimized_suggestions[0]
            return SchedulingSuggestion(
                suggested_time=best_suggestion['time'],
                confidence_score=best_suggestion['score'],
                reasoning=best_suggestion['reasoning'],
                alternative_times=[s['time'] for s in optimized_suggestions[1:4]],
                potential_conflicts=best_suggestion.get('conflicts', []),
                optimization_factors=best_suggestion.get('factors', {})
            )
        else:
            # Fallback to basic scheduling
            return await self._fallback_scheduling(meeting_requirements)
    
    async def auto_schedule_recurring_meetings(self, series_requirements: Dict[str, Any]) -> List[SchedulingSuggestion]:
        """Automatically schedule a series of recurring meetings."""
        
        suggestions = []
        current_date = datetime.now().date()
        end_date = series_requirements.get('end_date', current_date + timedelta(days=90))
        
        recurrence_pattern = series_requirements.get('recurrence', {})
        interval_days = self._calculate_recurrence_interval(recurrence_pattern)
        
        while current_date <= end_date:
            meeting_req = series_requirements.copy()
            meeting_req['preferred_date'] = current_date
            
            suggestion = await self.suggest_optimal_meeting_time(meeting_req)
            if suggestion.confidence_score > 0.7:  # Only suggest high-confidence slots
                suggestions.append(suggestion)
            
            current_date += timedelta(days=interval_days)
        
        return suggestions
    
    async def detect_scheduling_conflicts(self, user_id: str, 
                                        time_range: Tuple[datetime, datetime]) -> List[Dict[str, Any]]:
        """Detect potential scheduling conflicts and suggest resolutions."""
        
        start_time, end_time = time_range
        events = await self.calendar_engine.get_events_in_range(user_id, start_time, end_time)
        
        conflicts = []
        
        # Check for overlapping events
        for i, event1 in enumerate(events):
            for event2 in events[i+1:]:
                if self._events_overlap(event1, event2):
                    conflict = {
                        'type': 'overlap',
                        'events': [event1.id, event2.id],
                        'severity': self._calculate_conflict_severity(event1, event2),
                        'suggestions': await self._generate_conflict_resolutions(event1, event2)
                    }
                    conflicts.append(conflict)
        
        # Check for preference violations
        preference_conflicts = await self._check_preference_violations(events)
        conflicts.extend(preference_conflicts)
        
        # Check for workload issues
        workload_conflicts = await self._check_workload_balance(events)
        conflicts.extend(workload_conflicts)
        
        return conflicts
    
    def _build_scheduling_constraints(self, requirements: Dict[str, Any], 
                                    participant_availability: Dict[str, Any],
                                    patterns: Dict[str, Any]) -> List[SchedulingConstraint]:
        """Build comprehensive scheduling constraints."""
        
        constraints = []
        
        # Time-based constraints
        if 'preferred_time' in requirements:
            constraints.append(SchedulingConstraint(
                constraint_type='preferred_time',
                priority=8,
                description='User preferred time',
                parameters={'time': requirements['preferred_time']}
            ))
        
        # Work hours constraint
        constraints.append(SchedulingConstraint(
            constraint_type='work_hours',
            priority=9,
            description='Within work hours',
            parameters={
                'start': self.user_preferences.work_hours[0],
                'end': self.user_preferences.work_hours[1]
            }
        ))
        
        # Meeting frequency constraint
        if patterns.get('meeting_frequency'):
            constraints.append(SchedulingConstraint(
                constraint_type='meeting_frequency',
                priority=6,
                description='Respect meeting frequency patterns',
                parameters={'max_daily': self.user_preferences.maximum_daily_meetings}
            ))
        
        # Participant availability constraints
        for participant, availability in participant_availability.items():
            constraints.append(SchedulingConstraint(
                constraint_type='participant_availability',
                priority=10,
                description=f'Participant {participant} availability',
                parameters={'participant': participant, 'availability': availability}
            ))
        
        # Focus time protection
        if self.user_preferences.focus_time_blocks:
            constraints.append(SchedulingConstraint(
                constraint_type='focus_time_protection',
                priority=7,
                description='Protect focus time blocks',
                parameters={'focus_blocks': self.user_preferences.focus_time_blocks}
            ))
        
        return constraints
    
    async def _find_candidate_time_slots(self, requirements: Dict[str, Any], 
                                       constraints: List[SchedulingConstraint]) -> List[datetime]:
        """Find candidate time slots that satisfy basic constraints."""
        
        candidates = []
        duration = requirements.get('duration', self.user_preferences.preferred_meeting_duration)
        
        # Start from preferred date or tomorrow
        start_date = requirements.get('preferred_date', datetime.now().date() + timedelta(days=1))
        search_period = timedelta(days=requirements.get('search_days', 14))
        
        current_datetime = datetime.combine(start_date, self.user_preferences.work_hours[0])
        end_search = datetime.combine(start_date + search_period, self.user_preferences.work_hours[1])
        
        while current_datetime < end_search:
            slot_end = current_datetime + timedelta(minutes=duration)
            
            # Check if slot satisfies all hard constraints
            if await self._slot_satisfies_constraints(current_datetime, slot_end, constraints):
                candidates.append(current_datetime)
            
            # Move to next 30-minute slot
            current_datetime += timedelta(minutes=30)
            
            # Skip to next day if past work hours
            if current_datetime.time() > self.user_preferences.work_hours[1]:
                next_day = current_datetime.date() + timedelta(days=1)
                current_datetime = datetime.combine(next_day, self.user_preferences.work_hours[0])
        
        return candidates
    
    async def _slot_satisfies_constraints(self, start_time: datetime, end_time: datetime, 
                                        constraints: List[SchedulingConstraint]) -> bool:
        """Check if a time slot satisfies all hard constraints."""
        
        for constraint in constraints:
            if constraint.priority >= 9:  # Hard constraints
                if not await self._check_constraint(start_time, end_time, constraint):
                    return False
        
        return True
    
    async def _check_constraint(self, start_time: datetime, end_time: datetime, 
                              constraint: SchedulingConstraint) -> bool:
        """Check if a specific constraint is satisfied."""
        
        if constraint.constraint_type == 'work_hours':
            work_start = constraint.parameters['start']
            work_end = constraint.parameters['end']
            return work_start <= start_time.time() <= work_end and work_start <= end_time.time() <= work_end
        
        elif constraint.constraint_type == 'participant_availability':
            # Check participant availability (simplified)
            availability = constraint.parameters['availability']
            return self._time_in_availability(start_time, end_time, availability)
        
        elif constraint.constraint_type == 'focus_time_protection':
            focus_blocks = constraint.parameters['focus_blocks']
            return not self._overlaps_focus_time(start_time, end_time, focus_blocks)
        
        return True
    
    def _time_in_availability(self, start_time: datetime, end_time: datetime, 
                            availability: List[Tuple[datetime, datetime]]) -> bool:
        """Check if time slot is within participant availability."""
        
        for avail_start, avail_end in availability:
            if avail_start <= start_time and end_time <= avail_end:
                return True
        return False
    
    def _overlaps_focus_time(self, start_time: datetime, end_time: datetime, 
                           focus_blocks: List[Tuple[time, time]]) -> bool:
        """Check if time slot overlaps with focus time blocks."""
        
        for focus_start, focus_end in focus_blocks:
            if (start_time.time() < focus_end and end_time.time() > focus_start):
                return True
        return False
class SchedulingPatternAnalyzer:
    def __init__(self):
        self.pattern_cache = {}
        
    async def analyze_user_patterns(self, user_id: str) -> Dict[str, Any]:
        """Analyze user scheduling patterns for intelligent suggestions."""
        
        if user_id in self.pattern_cache:
            return self.pattern_cache[user_id]
        
        # Get historical events (last 90 days)
        end_date = datetime.now()
        start_date = end_date - timedelta(days=90)
        
        # This would typically query the database for historical events
        historical_events = await self._get_historical_events(user_id, start_date, end_date)
        
        patterns = {
            'preferred_times': self._analyze_preferred_times(historical_events),
            'meeting_frequency': self._analyze_meeting_frequency(historical_events),
            'duration_preferences': self._analyze_duration_preferences(historical_events),
            'day_of_week_preferences': self._analyze_day_preferences(historical_events),
            'seasonal_patterns': self._analyze_seasonal_patterns(historical_events),
            'productivity_patterns': self._analyze_productivity_patterns(historical_events)
        }
        
        self.pattern_cache[user_id] = patterns
        return patterns
    
    def _analyze_preferred_times(self, events: List[Event]) -> Dict[str, Any]:
        """Analyze preferred meeting times from historical data."""
        
        meeting_times = []
        for event in events:
            if event.event_type == EventType.MEETING:
                meeting_times.append(event.start_time.hour + event.start_time.minute / 60.0)
        
        if not meeting_times:
            return {'peak_hours': [], 'confidence': 0.0}
        
        # Use clustering to find preferred time clusters
        times_array = np.array(meeting_times).reshape(-1, 1)
        
        # Determine optimal number of clusters (max 3 for morning, afternoon, late)
        n_clusters = min(3, len(set(meeting_times)))
        if n_clusters > 1:
            kmeans = KMeans(n_clusters=n_clusters, random_state=42)
            clusters = kmeans.fit_predict(times_array)
            
            # Find cluster centers and sizes
            cluster_info = []
            for i in range(n_clusters):
                cluster_times = [meeting_times[j] for j, c in enumerate(clusters) if c == i]
                cluster_info.append({
                    'center_hour': np.mean(cluster_times),
                    'size': len(cluster_times),
                    'std': np.std(cluster_times)
                })
            
            # Sort by cluster size (most frequent first)
            cluster_info.sort(key=lambda x: x['size'], reverse=True)
            
            return {
                'peak_hours': [info['center_hour'] for info in cluster_info],
                'confidence': max(cluster_info[0]['size'] / len(meeting_times), 0.3)
            }
        
        return {'peak_hours': [np.mean(meeting_times)], 'confidence': 0.5}
    
    def _analyze_meeting_frequency(self, events: List[Event]) -> Dict[str, Any]:
        """Analyze meeting frequency patterns."""
        
        meetings_by_day = {}
        for event in events:
            if event.event_type == EventType.MEETING:
                day_key = event.start_time.date()
                meetings_by_day[day_key] = meetings_by_day.get(day_key, 0) + 1
        
        if not meetings_by_day:
            return {'average_daily': 0, 'max_daily': 0, 'distribution': []}
        
        daily_counts = list(meetings_by_day.values())
        
        return {
            'average_daily': np.mean(daily_counts),
            'max_daily': max(daily_counts),
            'distribution': daily_counts,
            'busy_days_threshold': np.percentile(daily_counts, 75)
        }
    
    async def _get_historical_events(self, user_id: str, start_date: datetime, 
                                   end_date: datetime) -> List[Event]:
        """Get historical events for pattern analysis."""
        # This would typically query the database
        # For now, return empty list as placeholder
        return []
class ExternalIntegrationManager:
    def __init__(self):
        self.integrations = {}
        self.api_clients = {}
        
    async def setup_integration(self, integration_type: IntegrationType, 
                              config: Dict[str, Any]) -> bool:
        """Setup an external integration."""
        
        try:
            if integration_type == IntegrationType.EMAIL:
                return await self._setup_email_integration(config)
            elif integration_type == IntegrationType.VIDEO_CONFERENCE:
                return await self._setup_video_conference_integration(config)
            elif integration_type == IntegrationType.PROJECT_MANAGEMENT:
                return await self._setup_project_management_integration(config)
            elif integration_type == IntegrationType.TRAVEL_BOOKING:
                return await self._setup_travel_integration(config)
            else:
                return False
        except Exception as e:
            print(f"Error setting up {integration_type.value} integration: {e}")
            return False
    
    async def _setup_email_integration(self, config: Dict[str, Any]) -> bool:
        """Setup email integration for automatic event creation."""
        
        email_config = {
            'smtp_server': config.get('smtp_server'),
            'smtp_port': config.get('smtp_port', 587),
            'username': config.get('username'),
            'password': config.get('password'),
            'imap_server': config.get('imap_server'),
            'imap_port': config.get('imap_port', 993)
        }
        
        # Test connection
        try:
            import imaplib
            mail = imaplib.IMAP4_SSL(email_config['imap_server'], email_config['imap_port'])
            mail.login(email_config['username'], email_config['password'])
            mail.logout()
            
            self.integrations[IntegrationType.EMAIL] = email_config
            return True
        except Exception as e:
            print(f"Email integration test failed: {e}")
            return False
    
    async def _setup_video_conference_integration(self, config: Dict[str, Any]) -> bool:
        """Setup video conferencing integration."""
        
        platform = config.get('platform', 'zoom')
        
        if platform == 'zoom':
            zoom_config = {
                'api_key': config.get('api_key'),
                'api_secret': config.get('api_secret'),
                'account_id': config.get('account_id')
            }
            
            # Test Zoom API connection
            try:
                # This would test the actual Zoom API
                self.integrations[IntegrationType.VIDEO_CONFERENCE] = {
                    'platform': 'zoom',
                    'config': zoom_config
                }
                return True
            except Exception as e:
                print(f"Zoom integration test failed: {e}")
                return False
        
        return False
    
    async def create_video_conference_meeting(self, event_details: Dict[str, Any]) -> Dict[str, Any]:
        """Create a video conference meeting for an event."""
        
        if IntegrationType.VIDEO_CONFERENCE not in self.integrations:
            return {'success': False, 'error': 'Video conference integration not configured'}
        
        integration = self.integrations[IntegrationType.VIDEO_CONFERENCE]
        
        if integration['platform'] == 'zoom':
            return await self._create_zoom_meeting(event_details, integration['config'])
        
        return {'success': False, 'error': 'Unsupported video conference platform'}
    
    async def _create_zoom_meeting(self, event_details: Dict[str, Any], 
                                 zoom_config: Dict[str, Any]) -> Dict[str, Any]:
        """Create a Zoom meeting."""
        
        # This would use the actual Zoom API
        # For demonstration, return mock data
        
        meeting_data = {
            'topic': event_details.get('title', 'Calendar Meeting'),
            'type': 2,  # Scheduled meeting
            'start_time': event_details['start_time'].isoformat(),
            'duration': event_details.get('duration', 60),
            'timezone': event_details.get('timezone', 'UTC'),
            'settings': {
                'host_video': True,
                'participant_video': True,
                'join_before_host': False,
                'mute_upon_entry': True,
                'waiting_room': True
            }
        }
        
        # Mock Zoom API response
        return {
            'success': True,
            'meeting_url': f"https://zoom.us/j/123456789",
            'meeting_id': '123456789',
            'password': 'abc123',
            'start_url': f"https://zoom.us/s/123456789?role=1"
        }
    
    async def send_meeting_invitation(self, event: Event, participants: List[str]) -> bool:
        """Send meeting invitations via email."""
        
        if IntegrationType.EMAIL not in self.integrations:
            return False
        
        email_config = self.integrations[IntegrationType.EMAIL]
        
        try:
            # Create email content
            subject = f"Meeting Invitation: {event.title}"
            
            # Generate calendar invitation (ICS format)
            ics_content = self._generate_ics_invitation(event)
            
            body = f"""
            You're invited to a meeting:
            
            Title: {event.title}
            Date: {event.start_time.strftime('%A, %B %d, %Y')}
            Time: {event.start_time.strftime('%I:%M %p')} - {event.calculated_end_time.strftime('%I:%M %p')}
            
            {f'Location: {event.location}' if event.location else ''}
            {f'Description: {event.description}' if event.description else ''}
            
            Please confirm your attendance.
            """
            
            # Send email to each participant
            for participant_email in participants:
                await self._send_email(
                    email_config,
                    participant_email,
                    subject,
                    body,
                    ics_content
                )
            
            return True
            
        except Exception as e:
            print(f"Error sending meeting invitations: {e}")
            return False
    
    def _generate_ics_invitation(self, event: Event) -> str:
        """Generate ICS calendar invitation content."""
        
        ics_content = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Calendar App//EN
BEGIN:VEVENT
UID:{event.id}@calendar-app.com
DTSTAMP:{datetime.now().strftime('%Y%m%dT%H%M%SZ')}
DTSTART:{event.start_time.strftime('%Y%m%dT%H%M%SZ')}
DTEND:{event.calculated_end_time.strftime('%Y%m%dT%H%M%SZ')}
SUMMARY:{event.title}
DESCRIPTION:{event.description or ''}
LOCATION:{event.location or ''}
STATUS:CONFIRMED
SEQUENCE:0
END:VEVENT
END:VCALENDAR"""
        
        return ics_content
    
    async def _send_email(self, email_config: Dict[str, Any], to_email: str, 
                         subject: str, body: str, attachment_content: str = None):
        """Send email with optional calendar attachment."""
        
        msg = MIMEMultipart()
        msg['From'] = email_config['username']
        msg['To'] = to_email
        msg['Subject'] = subject
        
        msg.attach(MIMEText(body, 'plain'))
        
        if attachment_content:
            attachment = MIMEText(attachment_content)
            attachment.add_header('Content-Disposition', 'attachment', filename='meeting.ics')
            msg.attach(attachment)
        
        # Send email
        server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
        server.starttls()
        server.login(email_config['username'], email_config['password'])
        server.send_message(msg)
        server.quit()
class PredictiveAssistant:
    def __init__(self, calendar_engine, pattern_analyzer):
        self.calendar_engine = calendar_engine
        self.pattern_analyzer = pattern_analyzer
        self.prediction_models = {}
        
    async def generate_proactive_suggestions(self, user_id: str) -> List[Dict[str, Any]]:
        """Generate proactive suggestions based on user patterns and context."""
        
        suggestions = []
        
        # Analyze current schedule
        today = datetime.now().date()
        week_start = today - timedelta(days=today.weekday())
        week_end = week_start + timedelta(days=6)
        
        current_events = await self.calendar_engine.get_events_in_range(
            user_id, 
            datetime.combine(week_start, datetime.min.time()),
            datetime.combine(week_end, datetime.max.time())
        )
        
        # Check for scheduling opportunities
        scheduling_suggestions = await self._suggest_scheduling_opportunities(user_id, current_events)
        suggestions.extend(scheduling_suggestions)
        
        # Check for optimization opportunities
        optimization_suggestions = await self._suggest_schedule_optimizations(user_id, current_events)
        suggestions.extend(optimization_suggestions)
        
        # Check for preparation reminders
        preparation_suggestions = await self._suggest_preparation_reminders(current_events)
        suggestions.extend(preparation_suggestions)
        
        return suggestions
    
    async def _suggest_scheduling_opportunities(self, user_id: str, 
                                             current_events: List[Event]) -> List[Dict[str, Any]]:
        """Suggest scheduling opportunities based on patterns."""
        
        suggestions = []
        patterns = await self.pattern_analyzer.analyze_user_patterns(user_id)
        
        # Check for missing regular meetings
        if patterns.get('meeting_frequency', {}).get('average_daily', 0) > len(current_events) / 7:
            suggestions.append({
                'type': 'scheduling_opportunity',
                'priority': 'medium',
                'title': 'Light schedule detected',
                'description': 'You have fewer meetings than usual this week. Consider scheduling important discussions.',
                'action': 'suggest_meeting_slots'
            })
        
        # Check for focus time opportunities
        busy_days = [day for day, events in self._group_events_by_day(current_events).items() 
                    if len(events) > 3]
        
        if len(busy_days) < 3:  # Less than 3 busy days
            suggestions.append({
                'type': 'focus_time_opportunity',
                'priority': 'high',
                'title': 'Focus time available',
                'description': 'You have several days with light schedules. Perfect for deep work!',
                'action': 'block_focus_time'
            })
        
        return suggestions
    
    async def _suggest_schedule_optimizations(self, user_id: str, 
                                           current_events: List[Event]) -> List[Dict[str, Any]]:
        """Suggest schedule optimizations."""
        
        suggestions = []
        
        # Check for fragmented schedule
        fragmented_days = self._detect_fragmented_schedule(current_events)
        if fragmented_days:
            suggestions.append({
                'type': 'schedule_optimization',
                'priority': 'medium',
                'title': 'Fragmented schedule detected',
                'description': f'You have fragmented schedules on {len(fragmented_days)} days. Consider consolidating meetings.',
                'action': 'consolidate_meetings',
                'data': {'fragmented_days': fragmented_days}
            })
        
        # Check for back-to-back meetings
        back_to_back_count = self._count_back_to_back_meetings(current_events)
        if back_to_back_count > 3:
            suggestions.append({
                'type': 'schedule_optimization',
                'priority': 'high',
                'title': 'Too many back-to-back meetings',
                'description': f'You have {back_to_back_count} back-to-back meetings. Consider adding buffers.',
                'action': 'add_meeting_buffers'
            })
        
        return suggestions
    
    def _group_events_by_day(self, events: List[Event]) -> Dict[date, List[Event]]:
        """Group events by day."""
        
        events_by_day = {}
        for event in events:
            day = event.start_time.date()
            if day not in events_by_day:
                events_by_day[day] = []
            events_by_day[day].append(event)
        
        return events_by_day
    
    def _detect_fragmented_schedule(self, events: List[Event]) -> List[date]:
        """Detect days with fragmented schedules."""
        
        fragmented_days = []
        events_by_day = self._group_events_by_day(events)
        
        for day, day_events in events_by_day.items():
            if len(day_events) < 2:
                continue
            
            # Sort events by start time
            day_events.sort(key=lambda x: x.start_time)
            
            # Check for large gaps between meetings
            gaps = []
            for i in range(len(day_events) - 1):
                gap = (day_events[i+1].start_time - day_events[i].calculated_end_time).total_seconds() / 3600
                gaps.append(gap)
            
            # If there are gaps > 2 hours between meetings, consider it fragmented
            if any(gap > 2 for gap in gaps):
                fragmented_days.append(day)
        
        return fragmented_days
    
    def _count_back_to_back_meetings(self, events: List[Event]) -> int:
        """Count back-to-back meetings (< 15 minutes between)."""
        
        count = 0
        events_by_day = self._group_events_by_day(events)
        
        for day_events in events_by_day.values():
            day_events.sort(key=lambda x: x.start_time)
            
            for i in range(len(day_events) - 1):
                gap_minutes = (day_events[i+1].start_time - day_events[i].calculated_end_time).total_seconds() / 60
                if gap_minutes < 15:
                    count += 1
        
        return count
class SchedulingOptimizer:
    def __init__(self):
        self.optimization_strategies = {
            SchedulingStrategy.OPTIMIZE_FOR_FOCUS: self._optimize_for_focus,
            SchedulingStrategy.MINIMIZE_CONFLICTS: self._minimize_conflicts,
            SchedulingStrategy.BALANCE_WORKLOAD: self._balance_workload,
            SchedulingStrategy.RESPECT_PREFERENCES: self._respect_preferences,
            SchedulingStrategy.MINIMIZE_TRAVEL: self._minimize_travel
        }
    
    async def optimize_suggestions(self, candidate_slots: List[datetime], 
                                 constraints: List[SchedulingConstraint],
                                 patterns: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Optimize and rank scheduling suggestions."""
        
        scored_suggestions = []
        
        for slot in candidate_slots:
            score = 0.0
            factors = {}
            reasoning_parts = []
            
            # Apply each optimization strategy
            for strategy in SchedulingStrategy:
                strategy_score, strategy_factors, strategy_reasoning = await self.optimization_strategies[strategy](
                    slot, constraints, patterns
                )
                
                score += strategy_score
                factors[strategy.value] = strategy_factors
                if strategy_reasoning:
                    reasoning_parts.append(strategy_reasoning)
            
            # Normalize score
            score = min(score / len(SchedulingStrategy), 1.0)
            
            scored_suggestions.append({
                'time': slot,
                'score': score,
                'factors': factors,
                'reasoning': '; '.join(reasoning_parts)
            })
        
        # Sort by score (highest first)
        scored_suggestions.sort(key=lambda x: x['score'], reverse=True)
        
        return scored_suggestions
    
    async def _optimize_for_focus(self, slot: datetime, constraints: List[SchedulingConstraint], 
                                patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
        """Optimize for focus time and deep work."""
        
        score = 0.0
        factors = {}
        reasoning = ""
        
        # Prefer morning slots for focus
        if 9 <= slot.hour <= 11:
            score += 0.3
            factors['morning_focus'] = 0.3
            reasoning = "Morning slot optimal for focus"
        
        # Avoid fragmented time slots
        # This would check surrounding events to ensure sufficient focus time
        
        return score, factors, reasoning
    
    async def _minimize_conflicts(self, slot: datetime, constraints: List[SchedulingConstraint], 
                                patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
        """Minimize scheduling conflicts."""
        
        score = 0.0
        factors = {}
        reasoning = ""
        
        # Check constraint satisfaction
        satisfied_constraints = 0
        total_constraints = len(constraints)
        
        for constraint in constraints:
            # This would check if the slot satisfies the constraint
            # For now, assume all constraints are satisfied
            satisfied_constraints += 1
        
        if total_constraints > 0:
            constraint_score = satisfied_constraints / total_constraints
            score += constraint_score * 0.4
            factors['constraint_satisfaction'] = constraint_score
            
            if constraint_score == 1.0:
                reasoning = "No conflicts detected"
            else:
                reasoning = f"Satisfies {satisfied_constraints}/{total_constraints} constraints"
        
        return score, factors, reasoning
    
    async def _balance_workload(self, slot: datetime, constraints: List[SchedulingConstraint], 
                              patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
        """Balance workload across time periods."""
        
        score = 0.0
        factors = {}
        reasoning = ""
        
        # This would analyze existing workload and prefer slots that balance it
        # For now, return a moderate score
        score += 0.2
        factors['workload_balance'] = 0.2
        reasoning = "Contributes to balanced workload"
        
        return score, factors, reasoning
    
    async def _respect_preferences(self, slot: datetime, constraints: List[SchedulingConstraint], 
                                 patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
        """Respect user preferences and patterns."""
        
        score = 0.0
        factors = {}
        reasoning = ""
        
        # Check against preferred times from patterns
        preferred_times = patterns.get('preferred_times', {}).get('peak_hours', [])
        
        if preferred_times:
            slot_hour = slot.hour + slot.minute / 60.0
            
            # Find closest preferred time
            closest_preferred = min(preferred_times, key=lambda x: abs(x - slot_hour))
            time_diff = abs(closest_preferred - slot_hour)
            
            if time_diff <= 1.0:  # Within 1 hour of preferred time
                preference_score = 1.0 - (time_diff / 1.0)
                score += preference_score * 0.3
                factors['time_preference'] = preference_score
                reasoning = f"Aligns with preferred meeting time ({closest_preferred:.1f})"
        
        return score, factors, reasoning
    
    async def _minimize_travel(self, slot: datetime, constraints: List[SchedulingConstraint], 
                             patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
        """Minimize travel time between meetings."""
        
        score = 0.0
        factors = {}
        reasoning = ""
        
        # This would analyze location-based constraints and optimize for minimal travel
        # For now, return a base score
        score += 0.1
        factors['travel_optimization'] = 0.1
        reasoning = "Optimized for minimal travel"
        
        return score, factors, reasoning
class ConstraintSolver:
    def __init__(self):
        pass
    
    async def solve_scheduling_constraints(self, requirements: Dict[str, Any], 
                                         constraints: List[SchedulingConstraint]) -> List[datetime]:
        """Solve complex scheduling constraints to find feasible solutions."""
        
        # This would implement a constraint satisfaction algorithm
        # For now, return a simple implementation
        
        feasible_slots = []
        
        # Start with a basic time range
        start_time = datetime.now() + timedelta(days=1)
        end_time = start_time + timedelta(days=14)
        
        current_time = start_time.replace(hour=9, minute=0, second=0, microsecond=0)
        
        while current_time < end_time:
            # Check if this slot satisfies all constraints
            if await self._satisfies_all_constraints(current_time, constraints):
                feasible_slots.append(current_time)
            
            current_time += timedelta(minutes=30)
            
            # Skip weekends and non-business hours
            if current_time.weekday() >= 5:  # Weekend
                current_time += timedelta(days=2 - current_time.weekday() + 5)
                current_time = current_time.replace(hour=9, minute=0)
            elif current_time.hour >= 17:  # After business hours
                current_time = current_time.replace(hour=9, minute=0) + timedelta(days=1)
        
        return feasible_slots[:20]  # Return top 20 feasible slots
    
    async def _satisfies_all_constraints(self, slot_time: datetime, 
                                       constraints: List[SchedulingConstraint]) -> bool:
        """Check if a time slot satisfies all constraints."""
        
        for constraint in constraints:
            if not await self._check_single_constraint(slot_time, constraint):
                return False
        
        return True
    
    async def _check_single_constraint(self, slot_time: datetime, 
                                     constraint: SchedulingConstraint) -> bool:
        """Check a single constraint against a time slot."""
        
        # Implementation would depend on constraint type
        # For now, return True for most constraints
        
        if constraint.constraint_type == 'work_hours':
            work_start = constraint.parameters.get('start', time(9, 0))
            work_end = constraint.parameters.get('end', time(17, 0))
            return work_start <= slot_time.time() <= work_end
        
        return True



This advanced features implementation demonstrates how an LLM-based calendar application can provide sophisticated scheduling intelligence, seamless external integrations, and proactive assistance. The IntelligentScheduler uses machine learning techniques to analyze user patterns and optimize scheduling decisions, while the ExternalIntegrationManager enables seamless connectivity with email, video conferencing, and other external services. The PredictiveAssistant provides proactive suggestions to help users optimize their schedules and maintain work-life balance. These advanced features transform the calendar from a passive scheduling tool into an intelligent assistant that actively helps users manage their time more effectively.


Testing and Deployment Considerations


Testing an LLM-based calendar application presents unique challenges that go beyond traditional software testing approaches. The natural language processing components introduce variability and ambiguity that require specialized testing strategies to ensure reliable behavior across diverse user inputs and scenarios. The system must be tested not only for functional correctness but also for natural language understanding accuracy, conversational flow quality, and user experience consistency.


The testing strategy must encompass multiple layers including unit tests for individual components, integration tests for system interactions, natural language processing accuracy tests, user experience validation, and performance testing under various load conditions. Each layer requires different testing methodologies and success criteria, with particular attention to edge cases in natural language interpretation and error recovery scenarios.


Deployment considerations for an LLM-based calendar application involve complex infrastructure requirements including natural language processing services, real-time conversation management, database scalability, and external service integrations. The system must be designed for high availability while managing the computational costs associated with LLM processing and maintaining responsive user interactions across different devices and network conditions.


Here is a comprehensive implementation of testing frameworks and deployment strategies specifically designed for LLM-based calendar applications. This example demonstrates how to test natural language processing accuracy, validate conversational flows, and implement robust deployment patterns that ensure system reliability and performance.



import unittest

import asyncio

import pytest

from unittest.mock import Mock, AsyncMock, patch

from datetime import datetime, date, timedelta

import json

from typing import List, Dict, Any, Tuple

import numpy as np

from dataclasses import dataclass

import logging

from concurrent.futures import ThreadPoolExecutor

import time

import requests


@dataclass

class TestCase:

    name: str

    input_text: str

    expected_intent: Dict[str, Any]

    expected_action: str

    context: Dict[str, Any] = None

    expected_confidence: float = 0.8


@dataclass

class ConversationTestCase:

    name: str

    conversation_turns: List[Tuple[str, str]]  # (user_input, expected_response_pattern)

    initial_context: Dict[str, Any] = None

    success_criteria: Dict[str, Any] = None


class NLPTestSuite:

    def __init__(self, llm_service, calendar_engine):

        self.llm_service = llm_service

        self.calendar_engine = calendar_engine

        self.test_cases = self._load_test_cases()

        self.conversation_test_cases = self._load_conversation_test_cases()

        

    def _load_test_cases(self) -> List[TestCase]:

        """Load comprehensive test cases for NLP testing."""

        

        return [

            # Event Creation Tests

            TestCase(

                name="simple_meeting_creation",

                input_text="Schedule a meeting with John tomorrow at 2 PM",

                expected_intent={

                    "action": "create",

                    "parameters": {

                        "entities": {"people": ["John"]},

                        "temporal": {"date": "tomorrow", "time": "2 PM"}

                    }

                },

                expected_action="create"

            ),

            

            TestCase(

                name="recurring_meeting_creation",

                input_text="Set up a weekly team meeting every Tuesday at 10 AM",

                expected_intent={

                    "action": "create",

                    "parameters": {

                        "temporal": {"recurrence": "weekly", "day": "Tuesday", "time": "10 AM"}

                    }

                },

                expected_action="create"

            ),

            

            TestCase(

                name="complex_meeting_with_location",

                input_text="Book a project review meeting with Sarah and Mike next Friday at 3 PM in the conference room",

                expected_intent={

                    "action": "create",

                    "parameters": {

                        "entities": {

                            "people": ["Sarah", "Mike"],

                            "locations": ["conference room"],

                            "topics": ["project review"]

                        },

                        "temporal": {"date": "next Friday", "time": "3 PM"}

                    }

                },

                expected_action="create"

            ),

            

            # Event Query Tests

            TestCase(

                name="simple_schedule_query",

                input_text="What do I have scheduled for today?",

                expected_intent={

                    "action": "query",

                    "parameters": {

                        "temporal": {"date": "today"}

                    }

                },

                expected_action="query"

            ),

            

            TestCase(

                name="time_range_query",

                input_text="Show me my meetings for next week",

                expected_intent={

                    "action": "query",

                    "parameters": {

                        "temporal": {"date_range": "next week"}

                    }

                },

                expected_action="query"

            ),

            

            # Event Modification Tests

            TestCase(

                name="simple_reschedule",

                input_text="Move my 3 PM meeting to 4 PM",

                expected_intent={

                    "action": "update",

                    "parameters": {

                        "temporal": {"original_time": "3 PM", "new_time": "4 PM"}

                    }

                },

                expected_action="update"

            ),

            

            TestCase(

                name="contextual_reschedule",

                input_text="Move it to tomorrow",

                expected_intent={

                    "action": "update",

                    "parameters": {

                        "temporal": {"new_date": "tomorrow"}

                    }

                },

                expected_action="update",

                context={"last_referenced_event": "meeting_id_123"}

            ),

            

            # Free Time Finding Tests

            TestCase(

                name="find_free_time",

                input_text="When am I free this afternoon?",

                expected_intent={

                    "action": "find_slots",

                    "parameters": {

                        "temporal": {"date": "today", "time_period": "afternoon"}

                    }

                },

                expected_action="find_slots"

            ),

            

            # Edge Cases

            TestCase(

                name="ambiguous_time_reference",

                input_text="Schedule a meeting next Tuesday",

                expected_intent={

                    "action": "create",

                    "parameters": {

                        "temporal": {"date": "next Tuesday"}

                    }

                },

                expected_action="create",

                expected_confidence=0.6  # Lower confidence due to missing time

            ),

            

            TestCase(

                name="multiple_actions",

                input_text="Cancel my 2 PM meeting and schedule a new one with Tom at 3 PM",

                expected_intent={

                    "action": "delete",  # Should identify primary action

                    "parameters": {

                        "temporal": {"time": "2 PM"},

                        "secondary_action": {

                            "action": "create",

                            "entities": {"people": ["Tom"]},

                            "temporal": {"time": "3 PM"}

                        }

                    }

                },

                expected_action="delete"

            )

        ]

    

    def _load_conversation_test_cases(self) -> List[ConversationTestCase]:

        """Load conversation flow test cases."""

        

        return [

            ConversationTestCase(

                name="meeting_creation_flow",

                conversation_turns=[

                    ("Schedule a meeting with John", ".*when.*time.*"),

                    ("Tomorrow at 2 PM", ".*scheduled.*John.*tomorrow.*2.*PM.*"),

                    ("Add a reminder 30 minutes before", ".*reminder.*added.*")

                ],

                success_criteria={"events_created": 1, "reminders_created": 1}

            ),

            

            ConversationTestCase(

                name="conflict_resolution_flow",

                conversation_turns=[

                    ("Schedule a meeting tomorrow at 2 PM", ".*conflict.*existing.*"),

                    ("Show me alternative times", ".*available.*slots.*"),

                    ("Book the 3 PM slot", ".*scheduled.*3.*PM.*")

                ],

                success_criteria={"events_created": 1, "conflicts_resolved": 1}

            ),

            

            ConversationTestCase(

                name="clarification_flow",

                conversation_turns=[

                    ("Schedule a meeting next week", ".*which.*day.*time.*"),

                    ("Tuesday at 10 AM", ".*duration.*"),

                    ("One hour", ".*scheduled.*Tuesday.*10.*AM.*hour.*")

                ],

                success_criteria={"events_created": 1, "clarifications_handled": 2}

            )

        ]

    

    async def run_nlp_accuracy_tests(self) -> Dict[str, Any]:

        """Run comprehensive NLP accuracy tests."""

        

        results = {

            "total_tests": len(self.test_cases),

            "passed": 0,

            "failed": 0,

            "accuracy_scores": [],

            "failed_cases": [],

            "performance_metrics": {}

        }

        

        start_time = time.time()

        

        for test_case in self.test_cases:

            try:

                # Set up context if provided

                if test_case.context:

                    self._setup_test_context(test_case.context)

                

                # Process the input

                parsed_intent = await self.llm_service.parse_intent(

                    test_case.input_text, 

                    test_case.context or {}

                )

                

                # Validate the result

                is_correct, accuracy_score = self._validate_intent(

                    parsed_intent, 

                    test_case.expected_intent,

                    test_case.expected_confidence

                )

                

                if is_correct:

                    results["passed"] += 1

                else:

                    results["failed"] += 1

                    results["failed_cases"].append({

                        "test_name": test_case.name,

                        "input": test_case.input_text,

                        "expected": test_case.expected_intent,

                        "actual": parsed_intent,

                        "accuracy_score": accuracy_score

                    })

                

                results["accuracy_scores"].append(accuracy_score)

                

            except Exception as e:

                results["failed"] += 1

                results["failed_cases"].append({

                    "test_name": test_case.name,

                    "error": str(e)

                })

        

        # Calculate performance metrics

        end_time = time.time()

        results["performance_metrics"] = {

            "total_time": end_time - start_time,

            "average_time_per_test": (end_time - start_time) / len(self.test_cases),

            "overall_accuracy": np.mean(results["accuracy_scores"]) if results["accuracy_scores"] else 0,

            "pass_rate": results["passed"] / results["total_tests"]

        }

        

        return results

    

    async def run_conversation_flow_tests(self) -> Dict[str, Any]:

        """Run conversation flow tests."""

        

        results = {

            "total_conversations": len(self.conversation_test_cases),

            "passed": 0,

            "failed": 0,

            "failed_conversations": []

        }

        

        for conv_test in self.conversation_test_cases:

            try:

                conversation_passed = await self._test_conversation_flow(conv_test)

                

                if conversation_passed:

                    results["passed"] += 1

                else:

                    results["failed"] += 1

                    results["failed_conversations"].append(conv_test.name)

                    

            except Exception as e:

                results["failed"] += 1

                results["failed_conversations"].append({

                    "name": conv_test.name,

                    "error": str(e)

                })

        

        return results

    

    def _validate_intent(self, actual_intent: Dict[str, Any], 

                        expected_intent: Dict[str, Any], 

                        min_confidence: float) -> Tuple[bool, float]:

        """Validate parsed intent against expected intent."""

        

        accuracy_score = 0.0

        

        # Check action

        if actual_intent.get("action") == expected_intent.get("action"):

            accuracy_score += 0.4

        

        # Check parameters

        actual_params = actual_intent.get("parameters", {})

        expected_params = expected_intent.get("parameters", {})

        

        # Check entities

        if "entities" in expected_params:

            entity_score = self._compare_entities(

                actual_params.get("entities", {}),

                expected_params["entities"]

            )

            accuracy_score += entity_score * 0.3

        

        # Check temporal information

        if "temporal" in expected_params:

            temporal_score = self._compare_temporal(

                actual_params.get("temporal", {}),

                expected_params["temporal"]

            )

            accuracy_score += temporal_score * 0.3

        

        # Check confidence

        confidence = actual_intent.get("confidence", 0.0)

        if confidence >= min_confidence:

            accuracy_score += 0.1

        

        is_correct = accuracy_score >= 0.8

        return is_correct, accuracy_score

    

    def _compare_entities(self, actual_entities: Dict[str, List[str]], 

                         expected_entities: Dict[str, List[str]]) -> float:

        """Compare entity extraction results."""

        

        if not expected_entities:

            return 1.0

        

        total_score = 0.0

        entity_types = set(expected_entities.keys()) | set(actual_entities.keys())

        

        for entity_type in entity_types:

            expected_list = set(expected_entities.get(entity_type, []))

            actual_list = set(actual_entities.get(entity_type, []))

            

            if expected_list:

                intersection = expected_list & actual_list

                union = expected_list | actual_list

                jaccard_score = len(intersection) / len(union) if union else 0

                total_score += jaccard_score

        

        return total_score / len(entity_types) if entity_types else 1.0

    

    def _compare_temporal(self, actual_temporal: Dict[str, Any], 

                         expected_temporal: Dict[str, Any]) -> float:

        """Compare temporal information extraction."""

        

        if not expected_temporal:

            return 1.0

        

        score = 0.0

        total_fields = len(expected_temporal)

        

        for field, expected_value in expected_temporal.items():

            if field in actual_temporal:

                # Simplified comparison - in practice, would need more sophisticated temporal matching

                if str(actual_temporal[field]).lower() in str(expected_value).lower():

                    score += 1.0

        

        return score / total_fields if total_fields > 0 else 1.0

    

    async def _test_conversation_flow(self, conv_test: ConversationTestCase) -> bool:

        """Test a complete conversation flow."""

        

        conversation_manager = ConversationManager(self.llm_service, self.calendar_engine)

        ui_state = UserInterfaceState(

            current_view=ViewMode.DAY,

            selected_date=date.today(),

            interaction_mode=InteractionMode.CONVERSATION,

            conversation_active=True

        )

        

        # Set up initial context

        if conv_test.initial_context:

            self._setup_test_context(conv_test.initial_context)

        

        # Process each conversation turn

        for user_input, expected_response_pattern in conv_test.conversation_turns:

            try:

                response = await conversation_manager.process_user_input(user_input, ui_state)

                

                # Check if response matches expected pattern

                import re

                if not re.search(expected_response_pattern, response.get("message", ""), re.IGNORECASE):

                    return False

                    

            except Exception as e:

                logging.error(f"Conversation test failed: {e}")

                return False

        

        # Check success criteria

        if conv_test.success_criteria:

            return self._check_success_criteria(conv_test.success_criteria)

        

        return True

    

    def _setup_test_context(self, context: Dict[str, Any]):

        """Set up test context for conversation tests."""

        # Implementation would set up mock data and context

        pass

    

    def _check_success_criteria(self, criteria: Dict[str, Any]) -> bool:

        """Check if success criteria are met."""

        # Implementation would verify that expected actions were performed

        return True


class PerformanceTestSuite:

    def __init__(self, calendar_app):

        self.calendar_app = calendar_app

        self.load_test_scenarios = self._define_load_test_scenarios()

        

    def _define_load_test_scenarios(self) -> List[Dict[str, Any]]:

        """Define load testing scenarios."""

        

        return [

            {

                "name": "concurrent_nlp_processing",

                "description": "Test concurrent natural language processing",

                "concurrent_users": 50,

                "requests_per_user": 10,

                "test_inputs": [

                    "Schedule a meeting tomorrow at 2 PM",

                    "What do I have today?",

                    "Find me a free hour this week",

                    "Cancel my 3 PM appointment"

                ]

            },

            {

                "name": "database_stress_test",

                "description": "Test database performance under load",

                "concurrent_users": 100,

                "requests_per_user": 20,

                "operations": ["create", "read", "update", "delete"]

            },

            {

                "name": "conversation_memory_test",

                "description": "Test conversation memory management",

                "concurrent_conversations": 25,

                "turns_per_conversation": 15,

                "memory_threshold_mb": 1000

            }

        ]

    

    async def run_performance_tests(self) -> Dict[str, Any]:

        """Run comprehensive performance tests."""

        

        results = {

            "scenarios": {},

            "overall_metrics": {},

            "recommendations": []

        }

        

        for scenario in self.load_test_scenarios:

            scenario_results = await self._run_load_scenario(scenario)

            results["scenarios"][scenario["name"]] = scenario_results

        

        # Calculate overall metrics

        results["overall_metrics"] = self._calculate_overall_metrics(results["scenarios"])

        

        # Generate recommendations

        results["recommendations"] = self._generate_performance_recommendations(results)

        

        return results

    

    async def _run_load_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:

        """Run a specific load testing scenario."""

        

        start_time = time.time()

        

        if scenario["name"] == "concurrent_nlp_processing":

            return await self._test_concurrent_nlp(scenario)

        elif scenario["name"] == "database_stress_test":

            return await self._test_database_stress(scenario)

        elif scenario["name"] == "conversation_memory_test":

            return await self._test_conversation_memory(scenario)

        

        return {"error": "Unknown scenario"}

    

    async def _test_concurrent_nlp(self, scenario: Dict[str, Any]) -> Dict[str, Any]:

        """Test concurrent NLP processing performance."""

        

        concurrent_users = scenario["concurrent_users"]

        requests_per_user = scenario["requests_per_user"]

        test_inputs = scenario["test_inputs"]

        

        async def user_simulation():

            user_results = {

                "requests_completed": 0,

                "total_time": 0,

                "errors": 0,

                "response_times": []

            }

            

            for _ in range(requests_per_user):

                input_text = np.random.choice(test_inputs)

                

                request_start = time.time()

                try:

                    # Simulate NLP processing

                    await self.calendar_app.process_user_command(input_text)

                    request_time = time.time() - request_start

                    

                    user_results["requests_completed"] += 1

                    user_results["response_times"].append(request_time)

                    user_results["total_time"] += request_time

                    

                except Exception as e:

                    user_results["errors"] += 1

            

            return user_results

        

        # Run concurrent user simulations

        tasks = [user_simulation() for _ in range(concurrent_users)]

        user_results = await asyncio.gather(*tasks)

        

        # Aggregate results

        total_requests = sum(r["requests_completed"] for r in user_results)

        total_errors = sum(r["errors"] for r in user_results)

        all_response_times = []

        for r in user_results:

            all_response_times.extend(r["response_times"])

        

        return {

            "total_requests": total_requests,

            "total_errors": total_errors,

            "error_rate": total_errors / (total_requests + total_errors) if (total_requests + total_errors) > 0 else 0,

            "average_response_time": np.mean(all_response_times) if all_response_times else 0,

            "p95_response_time": np.percentile(all_response_times, 95) if all_response_times else 0,

            "p99_response_time": np.percentile(all_response_times, 99) if all_response_times else 0,

            "throughput_rps": total_requests / max(max(r["total_time"] for r in user_results), 1)

        }

    

    def _calculate_overall_metrics(self, scenario_results: Dict[str, Any]) -> Dict[str, Any]:

        """Calculate overall performance metrics."""

        

        overall_metrics = {

            "average_response_time": 0,

            "total_error_rate": 0,

            "peak_throughput": 0,

            "memory_efficiency": "good"

        }

        

        # Aggregate metrics from all scenarios

        response_times = []

        error_rates = []

        throughputs = []

        

        for scenario_name, results in scenario_results.items():

            if "average_response_time" in results:

                response_times.append(results["average_response_time"])

            if "error_rate" in results:

                error_rates.append(results["error_rate"])

            if "throughput_rps" in results:

                throughputs.append(results["throughput_rps"])

        

        if response_times:

            overall_metrics["average_response_time"] = np.mean(response_times)

        if error_rates:

            overall_metrics["total_error_rate"] = np.mean(error_rates)

        if throughputs:

            overall_metrics["peak_throughput"] = max(throughputs)

        

        return overall_metrics

    

    def _generate_performance_recommendations(self, results: Dict[str, Any]) -> List[str]:

        """Generate performance optimization recommendations."""

        

        recommendations = []

        overall_metrics = results["overall_metrics"]

        

        # Response time recommendations

        if overall_metrics.get("average_response_time", 0) > 2.0:

            recommendations.append(

                "Consider optimizing NLP processing pipeline - average response time exceeds 2 seconds"

            )

        

        # Error rate recommendations

        if overall_metrics.get("total_error_rate", 0) > 0.05:

            recommendations.append(

                "Error rate is above 5% - implement better error handling and retry mechanisms"

            )

        

        # Throughput recommendations

        if overall_metrics.get("peak_throughput", 0) < 10:

            recommendations.append(

                "Low throughput detected - consider implementing request queuing and load balancing"

            )

        

        return recommendations


class DeploymentManager:

    def __init__(self):

        self.deployment_configs = self._load_deployment_configs()

        self.health_checks = HealthCheckManager()

        self.monitoring = MonitoringManager()

        

    def _load_deployment_configs(self) -> Dict[str, Any]:

        """Load deployment configurations for different environments."""

        

        return {

            "development": {

                "infrastructure": {

                    "app_servers": 1,

                    "database_replicas": 1,

                    "llm_service_instances": 1,

                    "cache_instances": 1

                },

                "scaling": {

                    "auto_scaling_enabled": False,

                    "min_instances": 1,

                    "max_instances": 2

                },

                "monitoring": {

                    "log_level": "DEBUG",

                    "metrics_enabled": True,

                    "alerting_enabled": False

                }

            },

            

            "staging": {

                "infrastructure": {

                    "app_servers": 2,

                    "database_replicas": 2,

                    "llm_service_instances": 2,

                    "cache_instances": 2,

                    "load_balancer": True

                },

                "scaling": {

                    "auto_scaling_enabled": True,

                    "min_instances": 2,

                    "max_instances": 5,

                    "cpu_threshold": 70,

                    "memory_threshold": 80

                },

                "monitoring": {

                    "log_level": "INFO",

                    "metrics_enabled": True,

                    "alerting_enabled": True,

                    "health_check_interval": 30

                }

            },

            

            "production": {

                "infrastructure": {

                    "app_servers": 5,

                    "database_replicas": 3,

                    "llm_service_instances": 10,

                    "cache_instances": 3,

                    "load_balancer": True,

                    "cdn_enabled": True,

                    "backup_instances": 2

                },

                "scaling": {

                    "auto_scaling_enabled": True,

                    "min_instances": 5,

                    "max_instances": 20,

                    "cpu_threshold": 60,

                    "memory_threshold": 70,

                    "predictive_scaling": True

                },

                "monitoring": {

                    "log_level": "WARN",

                    "metrics_enabled": True,

                    "alerting_enabled": True,

                    "health_check_interval": 15,

                    "performance_monitoring": True,

                    "user_analytics": True

                },

                "security": {

                    "encryption_at_rest": True,

                    "encryption_in_transit": True,

                    "api_rate_limiting": True,

                    "ddos_protection": True,

                    "security_scanning": True

                }

            }

        }

    

    async def deploy_application(self, environment: str, version: str) -> Dict[str, Any]:

        """Deploy the application to specified environment."""

        

        if environment not in self.deployment_configs:

            return {"success": False, "error": f"Unknown environment: {environment}"}

        

        config = self.deployment_configs[environment]

        

        deployment_result = {

            "environment": environment,

            "version": version,

            "deployment_time": datetime.now().isoformat(),

            "steps": [],

            "success": True,

            "rollback_available": True

        }

        

        try:

            # Pre-deployment checks

            pre_check_result = await self._run_pre_deployment_checks(environment, config)

            deployment_result["steps"].append(pre_check_result)

            

            if not pre_check_result["success"]:

                deployment_result["success"] = False

                return deployment_result

            

            # Deploy infrastructure

            infra_result = await self._deploy_infrastructure(config["infrastructure"])

            deployment_result["steps"].append(infra_result)

            

            # Deploy application

            app_result = await self._deploy_application_code(version, config)

            deployment_result["steps"].append(app_result)

            

            # Configure monitoring

            monitoring_result = await self._setup_monitoring(config.get("monitoring", {}))

            deployment_result["steps"].append(monitoring_result)

            

            # Run post-deployment tests

            post_test_result = await self._run_post_deployment_tests(environment)

            deployment_result["steps"].append(post_test_result)

            

            # Configure auto-scaling

            if config.get("scaling", {}).get("auto_scaling_enabled"):

                scaling_result = await self._configure_auto_scaling(config["scaling"])

                deployment_result["steps"].append(scaling_result)

            

        except Exception as e:

            deployment_result["success"] = False

            deployment_result["error"] = str(e)

            

            # Attempt rollback

            rollback_result = await self._rollback_deployment(environment)

            deployment_result["rollback_result"] = rollback_result

        

        return deployment_result

    

    async def _run_pre_deployment_checks(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:

        """Run pre-deployment checks."""

        

        checks = {

            "database_connectivity": await self._check_database_connectivity(),

            "external_services": await self._check_external_services(),

            "resource_availability": await self._check_resource_availability(config),

            "security_compliance": await self._check_security_compliance(environment)

        }

        

        all_passed = all(checks.values())

        

        return {

            "step": "pre_deployment_checks",

            "success": all_passed,

            "checks": checks,

            "message": "All pre-deployment checks passed" if all_passed else "Some checks failed"

        }

    

    async def _deploy_infrastructure(self, infra_config: Dict[str, Any]) -> Dict[str, Any]:

        """Deploy infrastructure components."""

        

        # This would typically use infrastructure as code tools like Terraform

        # For demonstration, we'll simulate the deployment

        

        components = []

        

        # Deploy app servers

        for i in range(infra_config.get("app_servers", 1)):

            components.append(f"app_server_{i+1}")

        

        # Deploy database replicas

        for i in range(infra_config.get("database_replicas", 1)):

            components.append(f"database_replica_{i+1}")

        

        # Deploy LLM service instances

        for i in range(infra_config.get("llm_service_instances", 1)):

            components.append(f"llm_service_{i+1}")

        

        return {

            "step": "infrastructure_deployment",

            "success": True,

            "components_deployed": components,

            "message": f"Deployed {len(components)} infrastructure components"

        }

    

    async def _deploy_application_code(self, version: str, config: Dict[str, Any]) -> Dict[str, Any]:

        """Deploy application code."""

        

        # This would typically involve:

        # - Building Docker images

        # - Pushing to container registry

        # - Rolling deployment to instances

        # - Database migrations

        

        return {

            "step": "application_deployment",

            "success": True,

            "version": version,

            "deployment_strategy": "rolling_update",

            "message": f"Successfully deployed version {version}"

        }

    

    async def _setup_monitoring(self, monitoring_config: Dict[str, Any]) -> Dict[str, Any]:

        """Setup monitoring and alerting."""

        

        monitoring_components = []

        

        if monitoring_config.get("metrics_enabled"):

            monitoring_components.append("metrics_collection")

        

        if monitoring_config.get("alerting_enabled"):

            monitoring_components.append("alerting_system")

        

        if monitoring_config.get("performance_monitoring"):

            monitoring_components.append("performance_monitoring")

        

        return {

            "step": "monitoring_setup",

            "success": True,

            "components": monitoring_components,

            "log_level": monitoring_config.get("log_level", "INFO"),

            "message": "Monitoring configured successfully"

        }

    

    async def _run_post_deployment_tests(self, environment: str) -> Dict[str, Any]:

        """Run post-deployment validation tests."""

        

        test_results = {

            "health_check": await self.health_checks.run_health_check(),

            "api_endpoints": await self._test_api_endpoints(),

            "nlp_functionality": await self._test_nlp_functionality(),

            "database_operations": await self._test_database_operations()

        }

        

        all_passed = all(test_results.values())

        

        return {

            "step": "post_deployment_tests",

            "success": all_passed,

            "test_results": test_results,

            "message": "All tests passed" if all_passed else "Some tests failed"

        }

    

    async def _configure_auto_scaling(self, scaling_config: Dict[str, Any]) -> Dict[str, Any]:

        """Configure auto-scaling policies."""

        

        policies = {

            "cpu_scaling": {

                "threshold": scaling_config.get("cpu_threshold", 70),

                "scale_up_cooldown": 300,

                "scale_down_cooldown": 600

            },

            "memory_scaling": {

                "threshold": scaling_config.get("memory_threshold", 80),

                "scale_up_cooldown": 300,

                "scale_down_cooldown": 600

            }

        }

        

        if scaling_config.get("predictive_scaling"):

            policies["predictive_scaling"] = {

                "enabled": True,

                "forecast_period": 3600,

                "confidence_threshold": 0.8

            }

        

        return {

            "step": "auto_scaling_configuration",

            "success": True,

            "policies": policies,

            "min_instances": scaling_config.get("min_instances", 1),

            "max_instances": scaling_config.get("max_instances", 10),

            "message": "Auto-scaling configured successfully"

        }

    

    async def _check_database_connectivity(self) -> bool:

        """Check database connectivity."""

        # Implementation would test actual database connection

        return True

    

    async def _check_external_services(self) -> bool:

        """Check external service availability."""

        # Implementation would test external API endpoints

        return True

    

    async def _check_resource_availability(self, config: Dict[str, Any]) -> bool:

        """Check if required resources are available."""

        # Implementation would check CPU, memory, storage availability

        return True

    

    async def _check_security_compliance(self, environment: str) -> bool:

        """Check security compliance requirements."""

        # Implementation would run security scans and compliance checks

        return True

    

    async def _test_api_endpoints(self) -> bool:

        """Test API endpoint availability."""

        # Implementation would test critical API endpoints

        return True

    

    async def _test_nlp_functionality(self) -> bool:

        """Test NLP functionality."""

        # Implementation would run basic NLP tests

        return True

    

    async def _test_database_operations(self) -> bool:

        """Test database operations."""

        # Implementation would test CRUD operations

        return True

    

    async def _rollback_deployment(self, environment: str) -> Dict[str, Any]:

        """Rollback deployment in case of failure."""

        

        return {

            "rollback_initiated": True,

            "rollback_time": datetime.now().isoformat(),

            "success": True,

            "message": "Rollback completed successfully"

        }


class HealthCheckManager:

    def __init__(self):

        self.health_endpoints = [

            "/health/database",

            "/health/llm-service",

            "/health/cache",

            "/health/external-apis"

        ]

    

    async def run_health_check(self) -> bool:

        """Run comprehensive health check."""

        

        health_results = {}

        

        for endpoint in self.health_endpoints:

            try:

                # This would make actual HTTP requests to health endpoints

                # For demonstration, we'll simulate the checks

                health_results[endpoint] = True

            except Exception as e:

                health_results[endpoint] = False

        

        return all(health_results.values())


class MonitoringManager:

    def __init__(self):

        self.metrics_collectors = []

        self.alert_rules = []

    

    def setup_monitoring(self, config: Dict[str, Any]):

        """Setup monitoring based on configuration."""

        

        if config.get("metrics_enabled"):

            self._setup_metrics_collection()

        

        if config.get("alerting_enabled"):

            self._setup_alerting()

        

        if config.get("performance_monitoring"):

            self._setup_performance_monitoring()

    

    def _setup_metrics_collection(self):

        """Setup metrics collection."""

        # Implementation would configure metrics collection

        pass

    

    def _setup_alerting(self):

        """Setup alerting rules."""

        # Implementation would configure alerting system

        pass

    

    def _setup_performance_monitoring(self):

        """Setup performance monitoring."""

        # Implementation would configure performance monitoring

        pass



This comprehensive testing and deployment framework provides the foundation for reliably testing and deploying an LLM-based calendar application. The NLPTestSuite ensures that natural language processing components work correctly across diverse inputs and conversation flows, while the PerformanceTestSuite validates system performance under various load conditions. The DeploymentManager provides robust deployment capabilities with proper health checks, monitoring, and rollback mechanisms. This framework enables teams to confidently deploy and maintain complex LLM-based applications while ensuring high availability and performance standards.


Conclusion and Future Enhancements


The development of an LLM-based calendar application represents a significant advancement in how users interact with scheduling and time management tools. By integrating natural language processing capabilities with traditional calendar functionality, we have created a system that understands user intent expressed in conversational language and translates it into precise calendar operations. This approach removes the friction typically associated with calendar management while providing intelligent assistance that learns from user patterns and preferences.


The comprehensive system we have designed demonstrates the practical implementation of several advanced concepts including natural language understanding, intelligent scheduling optimization, external service integration, and responsive user interface design. Each component works together to create a cohesive experience that feels natural and intuitive while maintaining the reliability and precision required for effective calendar management. The modular architecture ensures that individual components can be enhanced or replaced as technology evolves.


The testing and deployment frameworks we have established provide the foundation for maintaining system reliability and performance as the application scales. The specialized testing approaches for natural language processing components, combined with traditional software testing methodologies, ensure that the system behaves predictably across diverse user inputs and usage patterns. The deployment infrastructure supports the complex requirements of LLM-based applications while providing the monitoring and scaling capabilities necessary for production environments.


Looking toward future enhancements, several areas present exciting opportunities for further development. Advanced machine learning techniques could enable more sophisticated pattern recognition and predictive scheduling capabilities. Integration with emerging technologies such as augmented reality interfaces, voice assistants, and IoT devices could expand the ways users interact with their calendars. Enhanced collaboration features could leverage natural language processing to facilitate group scheduling and meeting coordination across organizations.


The foundation we have built provides a robust platform for these future enhancements while demonstrating the practical viability of LLM-based calendar applications. As natural language processing technology continues to advance and user expectations for intelligent interfaces grow, applications like this will become increasingly important for helping people manage their time more effectively and maintain better work-life balance. The combination of conversational interfaces with intelligent automation represents a significant step forward in personal productivity tools, making sophisticated calendar management accessible to users regardless of their technical expertise.

No comments: