Introduction and Overview
Creating a calendar application that leverages Large Language Models represents a significant shift from traditional calendar interfaces. Instead of clicking through menus and forms, users can interact with their calendar using natural language commands such as "Schedule a meeting with John next Tuesday at 2 PM" or "Show me all my free slots this week." This approach transforms the calendar from a passive display tool into an intelligent assistant that understands context, intent, and user preferences.
The core challenge in building such an application lies in bridging the gap between human language and structured calendar data. When a user says "Move my dentist appointment to next week," the system must understand that this refers to a specific existing appointment, determine what "next week" means in the current context, and execute the appropriate database operations. This requires sophisticated natural language processing, robust data management, and intelligent reasoning capabilities.
The application we will design supports comprehensive calendar functionality including meeting creation, modification, and deletion through conversational interfaces. It automatically integrates holiday information based on user location, supports recurring meeting series, and provides intelligent scheduling suggestions. The system maintains rich metadata for each calendar entry, supporting notes, images, color coding, and categorization while offering multiple viewing modes from daily to yearly perspectives.
Architecture and Core Components
The foundation of our LLM-based calendar application rests on a modular architecture that separates concerns while enabling seamless integration between components. The system consists of several key layers: the natural language processing layer that interprets user commands, the business logic layer that manages calendar operations, the data persistence layer that handles storage and retrieval, and the presentation layer that renders calendar views and manages user interactions.
At the heart of the system lies the Language Model Integration Service, which processes user input and translates natural language commands into structured actions. This service must handle the inherent ambiguity of human language while maintaining context across conversation turns. For example, when a user says "Cancel it" after discussing a specific meeting, the system must remember which meeting was referenced and execute the cancellation accordingly.
The Calendar Engine serves as the central orchestrator, managing all calendar-related operations including event creation, modification, deletion, and querying. This component maintains the business rules for scheduling, handles conflict detection, and manages recurring event series. It interfaces with the Location Service to determine user timezone and regional holidays, and with the Notification Service to handle reminders and alerts.
Data persistence is handled through a flexible database layer that supports both relational and document storage patterns. Calendar events require structured fields for dates, times, and participants, while also supporting unstructured data such as notes and attached images. The storage layer must efficiently handle queries across different time ranges while supporting real-time updates and synchronization.
Here is a foundational code example that demonstrates the core architecture setup. This example shows how the main application components are initialized and how they interact with each other. The CalendarApplication class serves as the central coordinator, managing the lifecycle of all services and providing the main entry point for user interactions.
import asyncio
from datetime import datetime, timedelta
from typing import List, Optional, Dict, Any
import json
class CalendarApplication:
def __init__(self):
self.llm_service = LLMService()
self.calendar_engine = CalendarEngine()
self.location_service = LocationService()
self.notification_service = NotificationService()
self.user_context = UserContext()
async def process_user_command(self, user_input: str) -> str:
# Parse the natural language input
intent = await self.llm_service.parse_intent(user_input, self.user_context)
# Execute the appropriate calendar operation
result = await self.calendar_engine.execute_command(intent)
# Generate a natural language response
response = await self.llm_service.generate_response(result, self.user_context)
return response
class LLMService:
def __init__(self):
self.conversation_history = []
async def parse_intent(self, user_input: str, context: 'UserContext') -> Dict[str, Any]:
# This would integrate with an actual LLM service
# For demonstration, we'll simulate intent parsing
prompt = f"""
Parse the following calendar command and extract the intent and parameters:
User input: "{user_input}"
Current date: {datetime.now().isoformat()}
User timezone: {context.timezone}
Return a JSON object with:
- action: create/update/delete/query/find_slots
- parameters: relevant details extracted from the input
"""
# Simulated LLM response parsing
if "schedule" in user_input.lower() or "meeting" in user_input.lower():
return {
"action": "create",
"parameters": self._extract_meeting_details(user_input)
}
elif "show" in user_input.lower() or "what" in user_input.lower():
return {
"action": "query",
"parameters": self._extract_query_details(user_input)
}
return {"action": "unknown", "parameters": {}}
def _extract_meeting_details(self, text: str) -> Dict[str, Any]:
# Simplified extraction logic
return {
"title": "Extracted meeting title",
"datetime": datetime.now() + timedelta(days=1),
"duration": 60,
"participants": []
}
def _extract_query_details(self, text: str) -> Dict[str, Any]:
return {
"date_range": {
"start": datetime.now().date(),
"end": datetime.now().date() + timedelta(days=7)
}
}
class UserContext:
def __init__(self):
self.timezone = "UTC"
self.location = None
self.preferences = {}
self.last_referenced_event = None
This architectural foundation provides the structure needed for natural language calendar interactions. The LLMService handles the complex task of understanding user intent, while the CalendarEngine manages the actual calendar operations. The UserContext maintains important state information that helps the system understand references and maintain conversation continuity.
Natural Language Processing Integration
The natural language processing component represents the most sophisticated aspect of our calendar application. Unlike traditional calendar interfaces that rely on forms and dropdown menus, our system must interpret the full spectrum of how users naturally express temporal concepts and scheduling intentions. This includes handling relative time references like "next Tuesday," "in two weeks," or "tomorrow morning," as well as understanding implicit context such as "move it to Friday" when referring to a previously mentioned meeting.
The LLM integration must handle several categories of calendar-related natural language understanding. Temporal expression parsing involves converting phrases like "next Monday at 3 PM" into specific datetime objects, accounting for the user's current timezone and calendar context. Entity extraction identifies key components such as meeting titles, participant names, locations, and duration specifications from free-form text. Intent classification determines whether the user wants to create, modify, delete, or query calendar information.
Context management proves particularly crucial for maintaining conversation flow. When a user says "Schedule a meeting with Sarah about the project review next Tuesday at 2 PM" followed by "Actually, make it 3 PM instead," the system must understand that "it" refers to the meeting just discussed and that only the time should be modified. This requires maintaining conversation state and entity resolution across multiple interaction turns.
Here is a detailed implementation of the natural language processing pipeline. This example demonstrates how user input is processed through multiple stages to extract structured calendar information. The NLPPipeline class coordinates the various processing steps, while specialized components handle specific aspects of language understanding.
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import dateutil.parser
from dateutil.relativedelta import relativedelta
class NLPPipeline:
def __init__(self):
self.temporal_parser = TemporalParser()
self.entity_extractor = EntityExtractor()
self.intent_classifier = IntentClassifier()
self.context_manager = ContextManager()
async def process_input(self, user_input: str, context: UserContext) -> Dict[str, Any]:
# Clean and normalize the input
normalized_input = self._normalize_input(user_input)
# Extract temporal expressions
temporal_info = self.temporal_parser.parse_temporal_expressions(
normalized_input, context.timezone
)
# Extract entities (people, places, topics)
entities = self.entity_extractor.extract_entities(normalized_input)
# Classify the intent
intent = self.intent_classifier.classify_intent(
normalized_input, temporal_info, entities
)
# Resolve references using context
resolved_intent = self.context_manager.resolve_references(
intent, context
)
return resolved_intent
def _normalize_input(self, text: str) -> str:
# Convert to lowercase and handle common abbreviations
text = text.lower()
text = re.sub(r'\btmrw\b', 'tomorrow', text)
text = re.sub(r'\btues\b', 'tuesday', text)
text = re.sub(r'\bwed\b', 'wednesday', text)
text = re.sub(r'\bthurs\b', 'thursday', text)
text = re.sub(r'\bfri\b', 'friday', text)
text = re.sub(r'\bsat\b', 'saturday', text)
text = re.sub(r'\bsun\b', 'sunday', text)
text = re.sub(r'\bmon\b', 'monday', text)
return text
class TemporalParser:
def __init__(self):
self.relative_patterns = {
r'next (\w+)': self._parse_next_weekday,
r'tomorrow': self._parse_tomorrow,
r'today': self._parse_today,
r'in (\d+) (days?|weeks?|months?)': self._parse_relative_duration,
r'(\d{1,2}):(\d{2})\s*(am|pm)?': self._parse_time,
}
def parse_temporal_expressions(self, text: str, timezone: str) -> Dict[str, Any]:
temporal_info = {
'date': None,
'time': None,
'duration': None,
'end_date': None,
'recurrence': None
}
# Try to parse absolute dates first
try:
parsed_dt = dateutil.parser.parse(text, fuzzy=True)
temporal_info['date'] = parsed_dt.date()
temporal_info['time'] = parsed_dt.time()
except:
# Fall back to pattern matching for relative expressions
for pattern, parser_func in self.relative_patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
result = parser_func(match, timezone)
temporal_info.update(result)
break
# Parse duration if specified
duration_match = re.search(r'for (\d+) (hours?|minutes?)', text)
if duration_match:
amount = int(duration_match.group(1))
unit = duration_match.group(2)
if 'hour' in unit:
temporal_info['duration'] = amount * 60
else:
temporal_info['duration'] = amount
# Check for recurrence patterns
if re.search(r'every (week|day|month)', text):
temporal_info['recurrence'] = self._parse_recurrence(text)
return temporal_info
def _parse_next_weekday(self, match, timezone: str) -> Dict[str, Any]:
weekday_name = match.group(1).lower()
weekdays = {
'monday': 0, 'tuesday': 1, 'wednesday': 2, 'thursday': 3,
'friday': 4, 'saturday': 5, 'sunday': 6
}
if weekday_name in weekdays:
target_weekday = weekdays[weekday_name]
today = datetime.now().date()
days_ahead = target_weekday - today.weekday()
if days_ahead <= 0: # Target day already happened this week
days_ahead += 7
target_date = today + timedelta(days=days_ahead)
return {'date': target_date}
return {}
def _parse_tomorrow(self, match, timezone: str) -> Dict[str, Any]:
tomorrow = datetime.now().date() + timedelta(days=1)
return {'date': tomorrow}
def _parse_today(self, match, timezone: str) -> Dict[str, Any]:
today = datetime.now().date()
return {'date': today}
def _parse_relative_duration(self, match, timezone: str) -> Dict[str, Any]:
amount = int(match.group(1))
unit = match.group(2).lower()
today = datetime.now().date()
if 'day' in unit:
target_date = today + timedelta(days=amount)
elif 'week' in unit:
target_date = today + timedelta(weeks=amount)
elif 'month' in unit:
target_date = today + relativedelta(months=amount)
else:
return {}
return {'date': target_date}
def _parse_time(self, match, timezone: str) -> Dict[str, Any]:
hour = int(match.group(1))
minute = int(match.group(2))
ampm = match.group(3)
if ampm and ampm.lower() == 'pm' and hour != 12:
hour += 12
elif ampm and ampm.lower() == 'am' and hour == 12:
hour = 0
from datetime import time
parsed_time = time(hour, minute)
return {'time': parsed_time}
def _parse_recurrence(self, text: str) -> Dict[str, Any]:
if 'every week' in text:
return {'type': 'weekly', 'interval': 1}
elif 'every day' in text:
return {'type': 'daily', 'interval': 1}
elif 'every month' in text:
return {'type': 'monthly', 'interval': 1}
return {}
class EntityExtractor:
def extract_entities(self, text: str) -> Dict[str, List[str]]:
entities = {
'people': [],
'locations': [],
'topics': [],
'organizations': []
}
# Simple pattern-based extraction (in production, use NER models)
# Extract people mentioned with "with"
people_pattern = r'with ([A-Z][a-z]+ ?[A-Z]?[a-z]*)'
people_matches = re.findall(people_pattern, text)
entities['people'].extend(people_matches)
# Extract locations mentioned with "at" or "in"
location_pattern = r'(?:at|in) ([A-Z][a-z]+(?: [A-Z][a-z]+)*)'
location_matches = re.findall(location_pattern, text)
entities['locations'].extend(location_matches)
# Extract topics mentioned with "about"
topic_pattern = r'about ([a-z]+(?: [a-z]+)*)'
topic_matches = re.findall(topic_pattern, text)
entities['topics'].extend(topic_matches)
return entities
class IntentClassifier:
def classify_intent(self, text: str, temporal_info: Dict, entities: Dict) -> Dict[str, Any]:
intent = {
'action': 'unknown',
'confidence': 0.0,
'parameters': {}
}
# Classify based on action verbs and context
if any(word in text for word in ['schedule', 'book', 'plan', 'arrange']):
intent['action'] = 'create'
intent['confidence'] = 0.9
elif any(word in text for word in ['cancel', 'delete', 'remove']):
intent['action'] = 'delete'
intent['confidence'] = 0.9
elif any(word in text for word in ['move', 'reschedule', 'change']):
intent['action'] = 'update'
intent['confidence'] = 0.9
elif any(word in text for word in ['show', 'list', 'what', 'when']):
intent['action'] = 'query'
intent['confidence'] = 0.8
elif any(word in text for word in ['free', 'available', 'open']):
intent['action'] = 'find_slots'
intent['confidence'] = 0.8
# Combine temporal and entity information
intent['parameters'] = {
'temporal': temporal_info,
'entities': entities,
'raw_text': text
}
return intent
class ContextManager:
def __init__(self):
self.conversation_history = []
self.entity_references = {}
def resolve_references(self, intent: Dict, context: UserContext) -> Dict[str, Any]:
# Resolve pronouns and references like "it", "that meeting", etc.
text = intent['parameters']['raw_text']
if any(ref in text for ref in ['it', 'that', 'the meeting']):
if context.last_referenced_event:
intent['parameters']['target_event_id'] = context.last_referenced_event
# Update conversation history
self.conversation_history.append({
'timestamp': datetime.now(),
'intent': intent,
'context': context
})
return intent
This natural language processing pipeline demonstrates how complex user input is systematically broken down into structured information that the calendar system can act upon. The TemporalParser handles the challenging task of converting human time expressions into precise datetime objects, while the EntityExtractor identifies relevant people, places, and topics. The IntentClassifier determines what action the user wants to perform, and the ContextManager maintains conversation state to handle references and follow-up commands.
Database Design and Data Models
The database design for an LLM-based calendar application must balance the structured nature of calendar data with the flexibility required for natural language interactions. Traditional calendar applications can rely on rigid schemas with predefined fields, but our system must accommodate the rich variety of information that users might express through natural language commands. This includes supporting arbitrary metadata, flexible categorization, and efficient querying across multiple dimensions.
The core data model centers around Events, which represent individual calendar entries, and EventSeries, which handle recurring meetings and appointment patterns. Each Event contains essential temporal information including start and end times, timezone data, and duration specifications. The model also supports rich metadata such as descriptions, notes, attached files, and color coding for visual organization.
User management requires careful consideration of privacy and access control. The system must support multiple users while maintaining strict data isolation. Each user has associated preferences, timezone settings, and location information that influences how calendar data is interpreted and displayed. The database must efficiently handle queries that span different users and time ranges while maintaining security boundaries.
Here is a comprehensive database model implementation that demonstrates the core data structures and relationships. This example uses SQLAlchemy to define the database schema, showing how calendar events, users, and related metadata are organized and connected. The model supports both simple events and complex recurring series while maintaining flexibility for future extensions.
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Boolean, ForeignKey, JSON, Enum
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.dialects.postgresql import UUID
import uuid
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
import enum
Base = declarative_base()
class EventType(enum.Enum):
MEETING = "meeting"
APPOINTMENT = "appointment"
REMINDER = "reminder"
TASK = "task"
PERSONAL = "personal"
WORK = "work"
class RecurrenceType(enum.Enum):
NONE = "none"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
YEARLY = "yearly"
CUSTOM = "custom"
class User(Base):
__tablename__ = 'users'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username = Column(String(100), unique=True, nullable=False)
email = Column(String(255), unique=True, nullable=False)
timezone = Column(String(50), default='UTC')
location_country = Column(String(100))
location_region = Column(String(100))
preferences = Column(JSON, default=dict)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
events = relationship("Event", back_populates="user", cascade="all, delete-orphan")
event_series = relationship("EventSeries", back_populates="user", cascade="all, delete-orphan")
def __repr__(self):
return f"<User(username='{self.username}', email='{self.email}')>"
class EventSeries(Base):
__tablename__ = 'event_series'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=False)
title = Column(String(500), nullable=False)
description = Column(Text)
# Recurrence configuration
recurrence_type = Column(Enum(RecurrenceType), default=RecurrenceType.NONE)
recurrence_interval = Column(Integer, default=1)
recurrence_end_date = Column(DateTime)
recurrence_count = Column(Integer)
recurrence_days_of_week = Column(JSON) # For weekly recurrence: [0,1,2,3,4] for Mon-Fri
recurrence_day_of_month = Column(Integer) # For monthly recurrence
# Series metadata
event_type = Column(Enum(EventType), default=EventType.MEETING)
color = Column(String(7), default='#3498db') # Hex color code
location = Column(String(500))
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", back_populates="event_series")
events = relationship("Event", back_populates="series", cascade="all, delete-orphan")
def generate_events(self, start_date: datetime, end_date: datetime) -> List['Event']:
"""Generate individual events for this series within the given date range."""
events = []
current_date = start_date
while current_date <= end_date:
if self._should_create_event_on_date(current_date):
event = Event(
series_id=self.id,
user_id=self.user_id,
title=self.title,
description=self.description,
start_time=current_date,
event_type=self.event_type,
color=self.color,
location=self.location
)
events.append(event)
current_date += self._get_recurrence_delta()
if self.recurrence_count and len(events) >= self.recurrence_count:
break
return events
def _should_create_event_on_date(self, date: datetime) -> bool:
"""Check if an event should be created on the given date based on recurrence rules."""
if self.recurrence_type == RecurrenceType.WEEKLY and self.recurrence_days_of_week:
return date.weekday() in self.recurrence_days_of_week
return True
def _get_recurrence_delta(self) -> timedelta:
"""Get the time delta for the next occurrence."""
if self.recurrence_type == RecurrenceType.DAILY:
return timedelta(days=self.recurrence_interval)
elif self.recurrence_type == RecurrenceType.WEEKLY:
return timedelta(weeks=self.recurrence_interval)
else:
return timedelta(days=1)
class Event(Base):
__tablename__ = 'events'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(UUID(as_uuid=True), ForeignKey('users.id'), nullable=False)
series_id = Column(UUID(as_uuid=True), ForeignKey('event_series.id'), nullable=True)
# Basic event information
title = Column(String(500), nullable=False)
description = Column(Text)
notes = Column(Text)
# Temporal information
start_time = Column(DateTime, nullable=False)
end_time = Column(DateTime)
duration_minutes = Column(Integer)
timezone = Column(String(50))
all_day = Column(Boolean, default=False)
# Event metadata
event_type = Column(Enum(EventType), default=EventType.MEETING)
color = Column(String(7), default='#3498db')
location = Column(String(500))
meeting_url = Column(String(1000))
# Status and visibility
status = Column(String(50), default='confirmed') # confirmed, tentative, cancelled
visibility = Column(String(50), default='private') # private, public, confidential
# Rich content
attachments = Column(JSON, default=list) # List of file references
participants = Column(JSON, default=list) # List of participant information
tags = Column(JSON, default=list) # User-defined tags
# System metadata
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
created_from_nlp = Column(Boolean, default=False)
original_nlp_input = Column(Text) # Store the original user command
# Relationships
user = relationship("User", back_populates="events")
series = relationship("EventSeries", back_populates="events")
reminders = relationship("Reminder", back_populates="event", cascade="all, delete-orphan")
def __repr__(self):
return f"<Event(title='{self.title}', start_time='{self.start_time}')>"
@property
def calculated_end_time(self) -> datetime:
"""Calculate end time based on start time and duration if end_time is not set."""
if self.end_time:
return self.end_time
elif self.duration_minutes:
return self.start_time + timedelta(minutes=self.duration_minutes)
else:
return self.start_time + timedelta(hours=1) # Default 1 hour duration
def conflicts_with(self, other_event: 'Event') -> bool:
"""Check if this event conflicts with another event."""
self_end = self.calculated_end_time
other_end = other_event.calculated_end_time
return (self.start_time < other_end and self_end > other_event.start_time)
def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary for API responses."""
return {
'id': str(self.id),
'title': self.title,
'description': self.description,
'start_time': self.start_time.isoformat(),
'end_time': self.calculated_end_time.isoformat(),
'event_type': self.event_type.value,
'color': self.color,
'location': self.location,
'participants': self.participants,
'tags': self.tags
}
class Reminder(Base):
__tablename__ = 'reminders'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
event_id = Column(UUID(as_uuid=True), ForeignKey('events.id'), nullable=False)
# Reminder timing
trigger_time = Column(DateTime, nullable=False)
minutes_before = Column(Integer) # Alternative to absolute trigger_time
# Reminder content and delivery
message = Column(Text)
delivery_method = Column(String(50), default='notification') # notification, email, sms
sent = Column(Boolean, default=False)
sent_at = Column(DateTime)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
event = relationship("Event", back_populates="reminders")
class Holiday(Base):
__tablename__ = 'holidays'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(200), nullable=False)
date = Column(DateTime, nullable=False)
country = Column(String(100))
region = Column(String(100))
holiday_type = Column(String(50)) # national, regional, religious, etc.
description = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
class CalendarDatabase:
def __init__(self, database_url: str):
self.engine = create_engine(database_url)
self.SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=self.engine)
Base.metadata.create_all(bind=self.engine)
def get_session(self):
"""Get a database session."""
return self.SessionLocal()
def create_user(self, username: str, email: str, timezone: str = 'UTC') -> User:
"""Create a new user."""
session = self.get_session()
try:
user = User(username=username, email=email, timezone=timezone)
session.add(user)
session.commit()
session.refresh(user)
return user
finally:
session.close()
def create_event(self, user_id: str, event_data: Dict[str, Any]) -> Event:
"""Create a new event."""
session = self.get_session()
try:
event = Event(
user_id=user_id,
title=event_data['title'],
description=event_data.get('description'),
start_time=event_data['start_time'],
end_time=event_data.get('end_time'),
duration_minutes=event_data.get('duration_minutes'),
event_type=EventType(event_data.get('event_type', 'meeting')),
location=event_data.get('location'),
participants=event_data.get('participants', []),
created_from_nlp=event_data.get('created_from_nlp', False),
original_nlp_input=event_data.get('original_nlp_input')
)
session.add(event)
session.commit()
session.refresh(event)
return event
finally:
session.close()
def get_events_in_range(self, user_id: str, start_date: datetime, end_date: datetime) -> List[Event]:
"""Get all events for a user within a date range."""
session = self.get_session()
try:
events = session.query(Event).filter(
Event.user_id == user_id,
Event.start_time >= start_date,
Event.start_time <= end_date
).order_by(Event.start_time).all()
return events
finally:
session.close()
def find_free_slots(self, user_id: str, start_date: datetime, end_date: datetime,
duration_minutes: int) -> List[Dict[str, datetime]]:
"""Find available time slots for a user within a date range."""
session = self.get_session()
try:
existing_events = self.get_events_in_range(user_id, start_date, end_date)
free_slots = []
current_time = start_date
while current_time + timedelta(minutes=duration_minutes) <= end_date:
slot_end = current_time + timedelta(minutes=duration_minutes)
# Check if this slot conflicts with any existing event
conflicts = False
for event in existing_events:
if (current_time < event.calculated_end_time and
slot_end > event.start_time):
conflicts = True
break
if not conflicts:
free_slots.append({
'start': current_time,
'end': slot_end
})
current_time += timedelta(minutes=30) # Check every 30 minutes
return free_slots
finally:
session.close()
This database model provides a robust foundation for storing and managing calendar data while supporting the complex requirements of natural language interactions. The Event model captures both simple appointments and rich meeting information, while the EventSeries model handles recurring events efficiently. The inclusion of metadata fields like original_nlp_input allows the system to learn from user interactions and improve its natural language understanding over time.
Meeting Management System
The meeting management system serves as the operational core of our calendar application, translating natural language intents into concrete calendar operations. This system must handle the full lifecycle of calendar events, from initial creation through modification and eventual deletion, while maintaining data consistency and providing intelligent conflict resolution. The complexity arises from the need to support both simple one-time events and sophisticated recurring meeting series, all while preserving the context and relationships that users express through natural language.
Event creation involves more than simply storing date and time information. When a user says "Schedule a weekly team meeting every Tuesday at 10 AM starting next week," the system must create a recurring series, generate individual event instances, handle timezone calculations, and set up appropriate reminders. The system must also validate the request against existing calendar entries, detect potential conflicts, and suggest alternatives when necessary.
Modification operations present particular challenges because users often reference events indirectly. A command like "Move my dentist appointment to next Friday" requires the system to identify which specific appointment the user means, especially if there are multiple dental appointments scheduled. The system must maintain enough context to resolve such ambiguous references while providing clear confirmation of the intended changes.
Here is a comprehensive implementation of the meeting management system that demonstrates how natural language intents are translated into calendar operations. This example shows the CalendarEngine class that coordinates all calendar operations, along with specialized handlers for different types of calendar actions. The system includes conflict detection, recurring event management, and intelligent scheduling suggestions.
from datetime import datetime, timedelta, time
from typing import List, Dict, Any, Optional, Tuple
import asyncio
from dataclasses import dataclass
from enum import Enum
class ConflictResolution(Enum):
REJECT = "reject"
SUGGEST_ALTERNATIVE = "suggest_alternative"
ALLOW_OVERLAP = "allow_overlap"
AUTO_RESCHEDULE = "auto_reschedule"
@dataclass
class SchedulingConflict:
conflicting_event: 'Event'
proposed_event: Dict[str, Any]
conflict_type: str
suggested_alternatives: List[Dict[str, datetime]]
@dataclass
class OperationResult:
success: bool
message: str
data: Optional[Any] = None
conflicts: List[SchedulingConflict] = None
suggestions: List[Dict[str, Any]] = None
class CalendarEngine:
def __init__(self, database: CalendarDatabase):
self.db = database
self.conflict_resolver = ConflictResolver()
self.recurrence_manager = RecurrenceManager()
self.notification_scheduler = NotificationScheduler()
async def execute_command(self, intent: Dict[str, Any]) -> OperationResult:
"""Execute a calendar command based on parsed intent."""
action = intent.get('action')
parameters = intent.get('parameters', {})
try:
if action == 'create':
return await self._handle_create_event(parameters)
elif action == 'update':
return await self._handle_update_event(parameters)
elif action == 'delete':
return await self._handle_delete_event(parameters)
elif action == 'query':
return await self._handle_query_events(parameters)
elif action == 'find_slots':
return await self._handle_find_slots(parameters)
else:
return OperationResult(
success=False,
message=f"Unknown action: {action}"
)
except Exception as e:
return OperationResult(
success=False,
message=f"Error executing command: {str(e)}"
)
async def _handle_create_event(self, parameters: Dict[str, Any]) -> OperationResult:
"""Handle event creation with conflict detection and resolution."""
temporal_info = parameters.get('temporal', {})
entities = parameters.get('entities', {})
# Extract event details from parameters
event_data = self._build_event_data(temporal_info, entities, parameters)
# Validate the event data
validation_result = self._validate_event_data(event_data)
if not validation_result.success:
return validation_result
# Check for conflicts
conflicts = await self._detect_conflicts(event_data)
if conflicts:
resolution = await self.conflict_resolver.resolve_conflicts(
event_data, conflicts
)
if resolution.action == ConflictResolution.REJECT:
return OperationResult(
success=False,
message="Event conflicts with existing appointments",
conflicts=conflicts
)
elif resolution.action == ConflictResolution.SUGGEST_ALTERNATIVE:
return OperationResult(
success=False,
message="Time slot not available. Here are some alternatives:",
suggestions=resolution.alternatives
)
# Create the event or event series
if temporal_info.get('recurrence'):
result = await self._create_recurring_event(event_data, temporal_info['recurrence'])
else:
result = await self._create_single_event(event_data)
# Schedule notifications
if result.success and result.data:
await self.notification_scheduler.schedule_reminders(result.data)
return result
async def _handle_update_event(self, parameters: Dict[str, Any]) -> OperationResult:
"""Handle event modification."""
target_event_id = parameters.get('target_event_id')
if not target_event_id:
# Try to identify the event from context
target_event_id = await self._identify_target_event(parameters)
if not target_event_id:
return OperationResult(
success=False,
message="Could not identify which event to modify"
)
# Get the existing event
session = self.db.get_session()
try:
existing_event = session.query(Event).filter(Event.id == target_event_id).first()
if not existing_event:
return OperationResult(
success=False,
message="Event not found"
)
# Apply modifications
modifications = self._extract_modifications(parameters)
updated_event_data = self._apply_modifications(existing_event, modifications)
# Validate the updated event
validation_result = self._validate_event_data(updated_event_data)
if not validation_result.success:
return validation_result
# Check for new conflicts
conflicts = await self._detect_conflicts(updated_event_data, exclude_event_id=target_event_id)
if conflicts:
return OperationResult(
success=False,
message="Updated event would conflict with existing appointments",
conflicts=conflicts
)
# Update the event
for key, value in updated_event_data.items():
if hasattr(existing_event, key):
setattr(existing_event, key, value)
existing_event.updated_at = datetime.utcnow()
session.commit()
return OperationResult(
success=True,
message=f"Event '{existing_event.title}' updated successfully",
data=existing_event
)
finally:
session.close()
async def _handle_delete_event(self, parameters: Dict[str, Any]) -> OperationResult:
"""Handle event deletion."""
target_event_id = parameters.get('target_event_id')
if not target_event_id:
target_event_id = await self._identify_target_event(parameters)
if not target_event_id:
return OperationResult(
success=False,
message="Could not identify which event to delete"
)
session = self.db.get_session()
try:
event = session.query(Event).filter(Event.id == target_event_id).first()
if not event:
return OperationResult(
success=False,
message="Event not found"
)
event_title = event.title
# Handle series deletion if this is part of a recurring series
if event.series_id:
delete_series = parameters.get('delete_entire_series', False)
if delete_series:
series = session.query(EventSeries).filter(EventSeries.id == event.series_id).first()
if series:
session.delete(series) # Cascade will delete all events
message = f"Recurring series '{event_title}' deleted successfully"
else:
session.delete(event)
message = f"Event '{event_title}' deleted successfully"
else:
session.delete(event)
message = f"Single occurrence of '{event_title}' deleted successfully"
else:
session.delete(event)
message = f"Event '{event_title}' deleted successfully"
session.commit()
return OperationResult(
success=True,
message=message
)
finally:
session.close()
async def _handle_query_events(self, parameters: Dict[str, Any]) -> OperationResult:
"""Handle event queries and listing."""
temporal_info = parameters.get('temporal', {})
user_id = parameters.get('user_id')
# Determine query date range
if temporal_info.get('date'):
start_date = datetime.combine(temporal_info['date'], time.min)
end_date = datetime.combine(temporal_info['date'], time.max)
elif temporal_info.get('date_range'):
start_date = datetime.combine(temporal_info['date_range']['start'], time.min)
end_date = datetime.combine(temporal_info['date_range']['end'], time.max)
else:
# Default to today
today = datetime.now().date()
start_date = datetime.combine(today, time.min)
end_date = datetime.combine(today, time.max)
# Query events
events = self.db.get_events_in_range(user_id, start_date, end_date)
# Format response
if not events:
message = f"No events found for the specified time period"
else:
event_list = []
for event in events:
event_list.append({
'title': event.title,
'start_time': event.start_time,
'end_time': event.calculated_end_time,
'location': event.location,
'type': event.event_type.value
})
message = f"Found {len(events)} event(s)"
return OperationResult(
success=True,
message=message,
data=events
)
async def _handle_find_slots(self, parameters: Dict[str, Any]) -> OperationResult:
"""Handle finding available time slots."""
temporal_info = parameters.get('temporal', {})
user_id = parameters.get('user_id')
duration = parameters.get('duration', 60) # Default 60 minutes
# Determine search date range
if temporal_info.get('date_range'):
start_date = datetime.combine(temporal_info['date_range']['start'], time(9, 0))
end_date = datetime.combine(temporal_info['date_range']['end'], time(17, 0))
else:
# Default to next 7 days, business hours
today = datetime.now()
start_date = datetime.combine(today.date(), time(9, 0))
end_date = datetime.combine(today.date() + timedelta(days=7), time(17, 0))
# Find available slots
free_slots = self.db.find_free_slots(user_id, start_date, end_date, duration)
# Limit to reasonable number of suggestions
free_slots = free_slots[:10]
if not free_slots:
message = "No available time slots found in the specified period"
else:
message = f"Found {len(free_slots)} available time slot(s)"
return OperationResult(
success=True,
message=message,
data=free_slots
)
def _build_event_data(self, temporal_info: Dict, entities: Dict, parameters: Dict) -> Dict[str, Any]:
"""Build event data from parsed parameters."""
event_data = {
'title': self._extract_title(parameters, entities),
'description': self._extract_description(parameters, entities),
'start_time': self._extract_start_time(temporal_info),
'duration_minutes': temporal_info.get('duration', 60),
'location': entities.get('locations', [None])[0],
'participants': entities.get('people', []),
'event_type': self._determine_event_type(parameters, entities),
'created_from_nlp': True,
'original_nlp_input': parameters.get('raw_text')
}
# Set end time if duration is specified
if event_data['start_time'] and event_data['duration_minutes']:
event_data['end_time'] = event_data['start_time'] + timedelta(
minutes=event_data['duration_minutes']
)
return event_data
def _extract_title(self, parameters: Dict, entities: Dict) -> str:
"""Extract event title from parameters and entities."""
raw_text = parameters.get('raw_text', '')
# Try to extract a meaningful title
if entities.get('topics'):
topic = entities['topics'][0]
if entities.get('people'):
return f"Meeting with {entities['people'][0]} about {topic}"
else:
return f"Meeting about {topic}"
elif entities.get('people'):
return f"Meeting with {entities['people'][0]}"
else:
# Extract title from common patterns
if 'meeting' in raw_text.lower():
return "Meeting"
elif 'appointment' in raw_text.lower():
return "Appointment"
elif 'call' in raw_text.lower():
return "Call"
else:
return "Event"
def _extract_start_time(self, temporal_info: Dict) -> Optional[datetime]:
"""Extract start time from temporal information."""
date = temporal_info.get('date')
time_obj = temporal_info.get('time')
if date and time_obj:
return datetime.combine(date, time_obj)
elif date:
return datetime.combine(date, time(9, 0)) # Default to 9 AM
else:
return None
def _determine_event_type(self, parameters: Dict, entities: Dict) -> EventType:
"""Determine event type based on context."""
raw_text = parameters.get('raw_text', '').lower()
if any(word in raw_text for word in ['doctor', 'dentist', 'appointment']):
return EventType.APPOINTMENT
elif any(word in raw_text for word in ['personal', 'family', 'friend']):
return EventType.PERSONAL
elif any(word in raw_text for word in ['work', 'office', 'business', 'meeting']):
return EventType.WORK
else:
return EventType.MEETING
async def _detect_conflicts(self, event_data: Dict[str, Any], exclude_event_id: str = None) -> List[SchedulingConflict]:
"""Detect scheduling conflicts for a proposed event."""
conflicts = []
if not event_data.get('start_time'):
return conflicts
start_time = event_data['start_time']
end_time = event_data.get('end_time') or start_time + timedelta(minutes=event_data.get('duration_minutes', 60))
# Query existing events in the time range
existing_events = self.db.get_events_in_range(
event_data.get('user_id'),
start_time - timedelta(hours=1),
end_time + timedelta(hours=1)
)
for existing_event in existing_events:
if exclude_event_id and str(existing_event.id) == exclude_event_id:
continue
if (start_time < existing_event.calculated_end_time and
end_time > existing_event.start_time):
conflict = SchedulingConflict(
conflicting_event=existing_event,
proposed_event=event_data,
conflict_type="time_overlap",
suggested_alternatives=[]
)
conflicts.append(conflict)
return conflicts
def _validate_event_data(self, event_data: Dict[str, Any]) -> OperationResult:
"""Validate event data before creation or modification."""
if not event_data.get('title'):
return OperationResult(
success=False,
message="Event title is required"
)
if not event_data.get('start_time'):
return OperationResult(
success=False,
message="Event start time is required"
)
# Check if start time is in the past
if event_data['start_time'] < datetime.now() - timedelta(minutes=5):
return OperationResult(
success=False,
message="Cannot schedule events in the past"
)
return OperationResult(success=True, message="Event data is valid")
async def _create_single_event(self, event_data: Dict[str, Any]) -> OperationResult:
"""Create a single event."""
try:
event = self.db.create_event(event_data['user_id'], event_data)
return OperationResult(
success=True,
message=f"Event '{event.title}' created successfully",
data=event
)
except Exception as e:
return OperationResult(
success=False,
message=f"Failed to create event: {str(e)}"
)
class ConflictResolver:
async def resolve_conflicts(self, proposed_event: Dict[str, Any],
conflicts: List[SchedulingConflict]) -> 'ConflictResolution':
"""Resolve scheduling conflicts using various strategies."""
# For now, suggest alternative times
alternatives = await self._find_alternative_times(proposed_event, conflicts)
return ConflictResolutionResult(
action=ConflictResolution.SUGGEST_ALTERNATIVE,
alternatives=alternatives
)
async def _find_alternative_times(self, proposed_event: Dict[str, Any],
conflicts: List[SchedulingConflict]) -> List[Dict[str, datetime]]:
"""Find alternative time slots for a conflicted event."""
alternatives = []
original_start = proposed_event['start_time']
duration = timedelta(minutes=proposed_event.get('duration_minutes', 60))
# Try different time slots around the original time
for offset_hours in [1, -1, 2, -2, 24, -24]:
alternative_start = original_start + timedelta(hours=offset_hours)
alternative_end = alternative_start + duration
# Check if this alternative conflicts
has_conflict = False
for conflict in conflicts:
existing_event = conflict.conflicting_event
if (alternative_start < existing_event.calculated_end_time and
alternative_end > existing_event.start_time):
has_conflict = True
break
if not has_conflict:
alternatives.append({
'start': alternative_start,
'end': alternative_end
})
if len(alternatives) >= 3: # Limit suggestions
break
return alternatives
@dataclass
class ConflictResolutionResult:
action: ConflictResolution
alternatives: List[Dict[str, datetime]] = None
message: str = ""
class RecurrenceManager:
def create_recurring_series(self, event_data: Dict[str, Any],
recurrence_config: Dict[str, Any]) -> EventSeries:
"""Create a recurring event series."""
# Implementation for creating recurring event series
pass
class NotificationScheduler:
async def schedule_reminders(self, event: Event):
"""Schedule reminders for an event."""
# Implementation for scheduling notifications
pass
This meeting management system provides comprehensive functionality for handling calendar operations through natural language interfaces. The CalendarEngine coordinates all operations while specialized components handle conflict resolution, recurring events, and notifications. The system maintains data integrity while providing intelligent suggestions and handling the complexities of natural language calendar interactions.
Holiday Integration and Location Services
The integration of holiday information and location-based services adds significant value to our calendar application by automatically providing relevant contextual information without requiring explicit user input. This system must handle the complexities of different regional holiday calendars, varying cultural observances, and the dynamic nature of user locations. The challenge lies in maintaining accurate, up-to-date holiday information while respecting user privacy and providing seamless integration with calendar functionality.
Holiday data management requires careful consideration of data sources, update mechanisms, and regional variations. Different countries observe different holidays, and even within countries, regional variations exist. Some holidays follow fixed dates while others follow complex lunar or religious calendars that change annually. The system must handle federal holidays, regional observances, religious holidays, and cultural celebrations while allowing users to customize which types of holidays they want to see.
Location services must balance accuracy with privacy concerns. The system needs to determine user location to provide relevant holiday information and timezone handling, but this must be done with appropriate user consent and data protection measures. The location service should support both automatic detection and manual configuration, allowing users to specify their primary location while handling temporary travel situations.
Here is a comprehensive implementation of the holiday integration and location services system. This example demonstrates how holiday data is managed, how location information is used to determine relevant holidays, and how the system integrates with the calendar to provide contextual information. The implementation includes support for multiple holiday sources and intelligent caching mechanisms.
import requests
import asyncio
from datetime import datetime, date, timedelta
from typing import List, Dict, Any, Optional, Tuple
import json
from dataclasses import dataclass
from enum import Enum
import geocoder
import pytz
from geopy.geocoders import Nominatim
import holidays as python_holidays
class HolidayType(Enum):
NATIONAL = "national"
REGIONAL = "regional"
RELIGIOUS = "religious"
CULTURAL = "cultural"
OBSERVANCE = "observance"
@dataclass
class LocationInfo:
country: str
country_code: str
region: str
city: str
timezone: str
latitude: float
longitude: float
confidence: float
@dataclass
class HolidayInfo:
name: str
date: date
country: str
region: Optional[str]
holiday_type: HolidayType
description: Optional[str]
is_public_holiday: bool
observed_date: Optional[date] = None
class LocationService:
def __init__(self):
self.geolocator = Nominatim(user_agent="calendar_app")
self.location_cache = {}
async def get_user_location(self, ip_address: Optional[str] = None,
user_provided_location: Optional[str] = None) -> LocationInfo:
"""Determine user location from various sources."""
if user_provided_location:
return await self._geocode_location(user_provided_location)
if ip_address:
return await self._get_location_from_ip(ip_address)
# Fallback to default location (could be user's saved preference)
return LocationInfo(
country="United States",
country_code="US",
region="",
city="",
timezone="America/New_York",
latitude=40.7128,
longitude=-74.0060,
confidence=0.5
)
async def _get_location_from_ip(self, ip_address: str) -> LocationInfo:
"""Get location information from IP address."""
try:
# Use a geolocation service (in production, use a reliable paid service)
g = geocoder.ip(ip_address)
if g.ok:
# Get timezone for the coordinates
timezone = await self._get_timezone_for_coordinates(g.latlng[0], g.latlng[1])
return LocationInfo(
country=g.country,
country_code=g.country_code,
region=g.state,
city=g.city,
timezone=timezone,
latitude=g.latlng[0],
longitude=g.latlng[1],
confidence=g.confidence
)
except Exception as e:
print(f"Error getting location from IP: {e}")
return await self.get_user_location() # Fallback to default
async def _geocode_location(self, location_string: str) -> LocationInfo:
"""Geocode a location string to get detailed location information."""
try:
location = self.geolocator.geocode(location_string, exactly_one=True)
if location:
# Extract country and region information
address_components = location.raw.get('display_name', '').split(', ')
# Get timezone for the coordinates
timezone = await self._get_timezone_for_coordinates(
location.latitude, location.longitude
)
return LocationInfo(
country=self._extract_country(location.raw),
country_code=self._extract_country_code(location.raw),
region=self._extract_region(location.raw),
city=self._extract_city(location.raw),
timezone=timezone,
latitude=location.latitude,
longitude=location.longitude,
confidence=1.0
)
except Exception as e:
print(f"Error geocoding location: {e}")
return await self.get_user_location() # Fallback to default
async def _get_timezone_for_coordinates(self, lat: float, lon: float) -> str:
"""Get timezone for given coordinates."""
try:
# In production, use a timezone API service
# For now, we'll use a simple mapping based on common timezones
# This is a simplified approach - in production, use a proper timezone API
if -125 <= lon <= -66: # Rough US longitude range
if lat >= 49:
return "America/Anchorage" # Alaska
elif lat >= 45:
return "America/New_York" # Northern US
elif lat >= 32:
return "America/Chicago" # Central US
else:
return "America/Los_Angeles" # Southern/Western US
elif -10 <= lon <= 40: # Rough Europe longitude range
return "Europe/London"
elif 100 <= lon <= 140: # Rough Asia-Pacific longitude range
return "Asia/Tokyo"
else:
return "UTC"
except:
return "UTC"
def _extract_country(self, geocode_result: Dict) -> str:
"""Extract country name from geocoding result."""
address = geocode_result.get('address', {})
return address.get('country', 'Unknown')
def _extract_country_code(self, geocode_result: Dict) -> str:
"""Extract country code from geocoding result."""
address = geocode_result.get('address', {})
return address.get('country_code', 'XX').upper()
def _extract_region(self, geocode_result: Dict) -> str:
"""Extract region/state from geocoding result."""
address = geocode_result.get('address', {})
return address.get('state', address.get('region', ''))
def _extract_city(self, geocode_result: Dict) -> str:
"""Extract city from geocoding result."""
address = geocode_result.get('address', {})
return address.get('city', address.get('town', address.get('village', '')))
class HolidayService:
def __init__(self, database: CalendarDatabase):
self.db = database
self.holiday_cache = {}
self.last_update = {}
async def get_holidays_for_location(self, location: LocationInfo,
year: int = None) -> List[HolidayInfo]:
"""Get holidays for a specific location and year."""
if year is None:
year = datetime.now().year
cache_key = f"{location.country_code}_{location.region}_{year}"
# Check cache first
if cache_key in self.holiday_cache:
cache_time = self.last_update.get(cache_key, datetime.min)
if datetime.now() - cache_time < timedelta(days=30): # Cache for 30 days
return self.holiday_cache[cache_key]
# Fetch holidays from multiple sources
holidays_list = []
# Get national holidays
national_holidays = await self._get_national_holidays(location.country_code, year)
holidays_list.extend(national_holidays)
# Get regional holidays if region is specified
if location.region:
regional_holidays = await self._get_regional_holidays(
location.country_code, location.region, year
)
holidays_list.extend(regional_holidays)
# Get religious holidays
religious_holidays = await self._get_religious_holidays(location.country_code, year)
holidays_list.extend(religious_holidays)
# Cache the results
self.holiday_cache[cache_key] = holidays_list
self.last_update[cache_key] = datetime.now()
# Store in database for persistence
await self._store_holidays_in_database(holidays_list)
return holidays_list
async def _get_national_holidays(self, country_code: str, year: int) -> List[HolidayInfo]:
"""Get national holidays using the python-holidays library."""
holidays_list = []
try:
# Use python-holidays library for reliable holiday data
country_holidays = python_holidays.country_holidays(country_code, years=year)
for holiday_date, holiday_name in country_holidays.items():
holiday_info = HolidayInfo(
name=holiday_name,
date=holiday_date,
country=country_code,
region=None,
holiday_type=HolidayType.NATIONAL,
description=f"National holiday in {country_code}",
is_public_holiday=True
)
holidays_list.append(holiday_info)
except Exception as e:
print(f"Error fetching national holidays for {country_code}: {e}")
return holidays_list
async def _get_regional_holidays(self, country_code: str, region: str,
year: int) -> List[HolidayInfo]:
"""Get regional/state holidays."""
holidays_list = []
try:
# Some countries support regional holidays in python-holidays
if country_code == 'US':
# US state holidays
state_holidays = python_holidays.US(state=region, years=year)
for holiday_date, holiday_name in state_holidays.items():
# Only include holidays not already in national list
if holiday_date not in python_holidays.US(years=year):
holiday_info = HolidayInfo(
name=holiday_name,
date=holiday_date,
country=country_code,
region=region,
holiday_type=HolidayType.REGIONAL,
description=f"Regional holiday in {region}, {country_code}",
is_public_holiday=True
)
holidays_list.append(holiday_info)
elif country_code == 'CA':
# Canadian provincial holidays
prov_holidays = python_holidays.Canada(prov=region, years=year)
for holiday_date, holiday_name in prov_holidays.items():
if holiday_date not in python_holidays.Canada(years=year):
holiday_info = HolidayInfo(
name=holiday_name,
date=holiday_date,
country=country_code,
region=region,
holiday_type=HolidayType.REGIONAL,
description=f"Provincial holiday in {region}, {country_code}",
is_public_holiday=True
)
holidays_list.append(holiday_info)
except Exception as e:
print(f"Error fetching regional holidays for {region}, {country_code}: {e}")
return holidays_list
async def _get_religious_holidays(self, country_code: str, year: int) -> List[HolidayInfo]:
"""Get religious holidays and observances."""
holidays_list = []
# This would typically integrate with religious calendar APIs
# For demonstration, we'll include some common religious observances
try:
# Example: Add common Christian holidays that might not be public holidays
religious_observances = [
("Ash Wednesday", self._calculate_ash_wednesday(year)),
("Palm Sunday", self._calculate_palm_sunday(year)),
("Maundy Thursday", self._calculate_maundy_thursday(year)),
("Good Friday", self._calculate_good_friday(year)),
("Easter Sunday", self._calculate_easter_sunday(year)),
]
for name, holiday_date in religious_observances:
if holiday_date:
holiday_info = HolidayInfo(
name=name,
date=holiday_date,
country=country_code,
region=None,
holiday_type=HolidayType.RELIGIOUS,
description=f"Christian observance",
is_public_holiday=False
)
holidays_list.append(holiday_info)
except Exception as e:
print(f"Error calculating religious holidays: {e}")
return holidays_list
def _calculate_easter_sunday(self, year: int) -> date:
"""Calculate Easter Sunday using the Western Christian calendar."""
# Using the algorithm for calculating Easter
a = year % 19
b = year // 100
c = year % 100
d = b // 4
e = b % 4
f = (b + 8) // 25
g = (b - f + 1) // 3
h = (19 * a + b - d - g + 15) % 30
i = c // 4
k = c % 4
l = (32 + 2 * e + 2 * i - h - k) % 7
m = (a + 11 * h + 22 * l) // 451
month = (h + l - 7 * m + 114) // 31
day = ((h + l - 7 * m + 114) % 31) + 1
return date(year, month, day)
def _calculate_ash_wednesday(self, year: int) -> date:
"""Calculate Ash Wednesday (46 days before Easter)."""
easter = self._calculate_easter_sunday(year)
return easter - timedelta(days=46)
def _calculate_palm_sunday(self, year: int) -> date:
"""Calculate Palm Sunday (7 days before Easter)."""
easter = self._calculate_easter_sunday(year)
return easter - timedelta(days=7)
def _calculate_maundy_thursday(self, year: int) -> date:
"""Calculate Maundy Thursday (3 days before Easter)."""
easter = self._calculate_easter_sunday(year)
return easter - timedelta(days=3)
def _calculate_good_friday(self, year: int) -> date:
"""Calculate Good Friday (2 days before Easter)."""
easter = self._calculate_easter_sunday(year)
return easter - timedelta(days=2)
async def _store_holidays_in_database(self, holidays_list: List[HolidayInfo]):
"""Store holidays in the database for persistence and caching."""
session = self.db.get_session()
try:
for holiday_info in holidays_list:
# Check if holiday already exists
existing_holiday = session.query(Holiday).filter(
Holiday.name == holiday_info.name,
Holiday.date == datetime.combine(holiday_info.date, datetime.min.time()),
Holiday.country == holiday_info.country
).first()
if not existing_holiday:
holiday = Holiday(
name=holiday_info.name,
date=datetime.combine(holiday_info.date, datetime.min.time()),
country=holiday_info.country,
region=holiday_info.region,
holiday_type=holiday_info.holiday_type.value,
description=holiday_info.description
)
session.add(holiday)
session.commit()
except Exception as e:
print(f"Error storing holidays in database: {e}")
session.rollback()
finally:
session.close()
class HolidayIntegrationService:
def __init__(self, location_service: LocationService, holiday_service: HolidayService):
self.location_service = location_service
self.holiday_service = holiday_service
async def integrate_holidays_for_user(self, user_id: str,
user_location: Optional[str] = None) -> List[HolidayInfo]:
"""Integrate holidays for a specific user based on their location."""
# Get user location
location = await self.location_service.get_user_location(
user_provided_location=user_location
)
# Get current year holidays
current_year = datetime.now().year
holidays_this_year = await self.holiday_service.get_holidays_for_location(
location, current_year
)
# Get next year holidays if we're near the end of the year
if datetime.now().month >= 10:
holidays_next_year = await self.holiday_service.get_holidays_for_location(
location, current_year + 1
)
holidays_this_year.extend(holidays_next_year)
return holidays_this_year
async def get_holidays_for_date_range(self, user_id: str, start_date: date,
end_date: date) -> List[HolidayInfo]:
"""Get holidays within a specific date range for a user."""
# Get user location (this could be cached per user)
location = await self.location_service.get_user_location()
holidays_in_range = []
# Get holidays for each year in the date range
for year in range(start_date.year, end_date.year + 1):
year_holidays = await self.holiday_service.get_holidays_for_location(location, year)
# Filter holidays within the date range
for holiday in year_holidays:
if start_date <= holiday.date <= end_date:
holidays_in_range.append(holiday)
return holidays_in_range
async def check_holiday_conflicts(self, event_date: date, location: LocationInfo) -> Optional[HolidayInfo]:
"""Check if an event date conflicts with a holiday."""
holidays = await self.holiday_service.get_holidays_for_location(location, event_date.year)
for holiday in holidays:
if holiday.date == event_date and holiday.is_public_holiday:
return holiday
return None
This holiday integration and location services system provides comprehensive support for automatically incorporating relevant holiday information into the calendar application. The LocationService handles the complex task of determining user location while respecting privacy, and the HolidayService manages holiday data from multiple sources with intelligent caching. The integration service coordinates these components to provide seamless holiday awareness throughout the calendar application, helping users avoid scheduling conflicts and stay aware of relevant observances in their region.
Calendar Display and Visualization
The calendar display and visualization system represents the primary interface through which users interact with their calendar data. This component must present complex temporal information in an intuitive and visually appealing manner while supporting multiple viewing modes and interaction patterns. The challenge lies in creating a flexible display system that can render everything from detailed daily schedules to high-level yearly overviews while maintaining consistency and usability across different view types.
The visualization system must handle various data types including single events, recurring series, all-day events, multi-day events, and holiday information. Each type of calendar entry may have different visual requirements, such as color coding for event types, special indicators for recurring events, and appropriate spacing for events of different durations. The system must also support user customization options such as color themes, font sizes, and layout preferences.
Responsive design considerations are crucial since users may access the calendar from different devices with varying screen sizes and interaction capabilities. The display system must adapt gracefully from desktop monitors to mobile phones while maintaining functionality and readability. This includes supporting both mouse-based interactions and touch gestures, as well as keyboard navigation for accessibility.
Here is a comprehensive implementation of the calendar display and visualization system. This example demonstrates how calendar data is transformed into visual representations across different view modes, including daily, weekly, monthly, and yearly views. The implementation includes support for responsive design, user customization, and interactive features.
from datetime import datetime, date, timedelta, time
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import json
from calendar import monthrange, weekday
import colorsys
class ViewMode(Enum):
DAY = "day"
WEEK = "week"
MONTH = "month"
YEAR = "year"
AGENDA = "agenda"
class DisplayTheme(Enum):
LIGHT = "light"
DARK = "dark"
AUTO = "auto"
@dataclass
class ViewConfig:
mode: ViewMode
start_date: date
end_date: date
theme: DisplayTheme
show_weekends: bool = True
show_holidays: bool = True
show_all_day_events: bool = True
time_format_24h: bool = False
first_day_of_week: int = 0 # 0 = Monday, 6 = Sunday
@dataclass
class EventDisplayInfo:
event_id: str
title: str
start_time: datetime
end_time: datetime
color: str
event_type: str
location: Optional[str]
is_all_day: bool
is_recurring: bool
conflict_level: int = 0
display_priority: int = 0
@dataclass
class CalendarCell:
date: date
events: List[EventDisplayInfo]
holidays: List[str]
is_today: bool
is_weekend: bool
is_other_month: bool
available_slots: List[Tuple[time, time]]
class CalendarRenderer:
def __init__(self):
self.color_palette = self._initialize_color_palette()
self.layout_calculator = LayoutCalculator()
self.conflict_detector = DisplayConflictDetector()
def render_calendar_view(self, events: List[Event], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a calendar view based on the specified configuration."""
# Prepare event display information
display_events = self._prepare_event_display_info(events, config)
# Detect and resolve display conflicts
display_events = self.conflict_detector.resolve_display_conflicts(display_events)
# Generate the appropriate view
if config.mode == ViewMode.DAY:
return self._render_day_view(display_events, holidays, config)
elif config.mode == ViewMode.WEEK:
return self._render_week_view(display_events, holidays, config)
elif config.mode == ViewMode.MONTH:
return self._render_month_view(display_events, holidays, config)
elif config.mode == ViewMode.YEAR:
return self._render_year_view(display_events, holidays, config)
elif config.mode == ViewMode.AGENDA:
return self._render_agenda_view(display_events, holidays, config)
else:
raise ValueError(f"Unsupported view mode: {config.mode}")
def _prepare_event_display_info(self, events: List[Event],
config: ViewConfig) -> List[EventDisplayInfo]:
"""Convert Event objects to EventDisplayInfo for rendering."""
display_events = []
for event in events:
# Skip events outside the view range
if (event.start_time.date() > config.end_date or
event.calculated_end_time.date() < config.start_date):
continue
display_info = EventDisplayInfo(
event_id=str(event.id),
title=event.title,
start_time=event.start_time,
end_time=event.calculated_end_time,
color=event.color or self._get_default_color(event.event_type),
event_type=event.event_type.value,
location=event.location,
is_all_day=event.all_day,
is_recurring=bool(event.series_id),
display_priority=self._calculate_display_priority(event)
)
display_events.append(display_info)
return display_events
def _render_day_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a detailed day view."""
target_date = config.start_date
# Filter events for the target date
day_events = [e for e in events if e.start_time.date() <= target_date <= e.end_time.date()]
# Separate all-day and timed events
all_day_events = [e for e in day_events if e.is_all_day]
timed_events = [e for e in day_events if not e.is_all_day]
# Generate time slots (15-minute intervals)
time_slots = self._generate_time_slots(datetime.combine(target_date, time(0, 0)),
datetime.combine(target_date, time(23, 59)),
15)
# Layout timed events
event_layout = self.layout_calculator.calculate_day_layout(timed_events, time_slots)
# Get holidays for the date
day_holidays = [h.name for h in holidays if h.date == target_date]
return {
'view_type': 'day',
'date': target_date.isoformat(),
'all_day_events': [self._event_to_dict(e) for e in all_day_events],
'timed_events': event_layout,
'holidays': day_holidays,
'time_slots': time_slots,
'is_today': target_date == date.today(),
'theme': config.theme.value
}
def _render_week_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a week view with all seven days."""
# Calculate week boundaries
week_start = config.start_date
week_days = [week_start + timedelta(days=i) for i in range(7)]
# Create calendar cells for each day
week_cells = []
for day_date in week_days:
day_events = [e for e in events if e.start_time.date() <= day_date <= e.end_time.date()]
day_holidays = [h.name for h in holidays if h.date == day_date]
cell = CalendarCell(
date=day_date,
events=day_events,
holidays=day_holidays,
is_today=day_date == date.today(),
is_weekend=day_date.weekday() >= 5,
is_other_month=False,
available_slots=self._calculate_available_slots(day_events, day_date)
)
week_cells.append(cell)
# Calculate multi-day event spans
multi_day_events = self._calculate_multi_day_spans(events, week_start, week_start + timedelta(days=6))
return {
'view_type': 'week',
'week_start': week_start.isoformat(),
'week_end': (week_start + timedelta(days=6)).isoformat(),
'days': [self._cell_to_dict(cell) for cell in week_cells],
'multi_day_events': multi_day_events,
'theme': config.theme.value
}
def _render_month_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a month view with calendar grid."""
target_month = config.start_date.month
target_year = config.start_date.year
# Calculate month boundaries
first_day = date(target_year, target_month, 1)
last_day = date(target_year, target_month, monthrange(target_year, target_month)[1])
# Calculate calendar grid (including previous/next month days)
grid_start = first_day - timedelta(days=first_day.weekday())
grid_end = last_day + timedelta(days=6 - last_day.weekday())
# Generate calendar grid
calendar_grid = []
current_date = grid_start
while current_date <= grid_end:
week_row = []
for _ in range(7):
day_events = [e for e in events if e.start_time.date() <= current_date <= e.end_time.date()]
day_holidays = [h.name for h in holidays if h.date == current_date]
# Limit events shown in month view
display_events = sorted(day_events, key=lambda x: x.display_priority, reverse=True)[:3]
cell = CalendarCell(
date=current_date,
events=display_events,
holidays=day_holidays,
is_today=current_date == date.today(),
is_weekend=current_date.weekday() >= 5,
is_other_month=current_date.month != target_month,
available_slots=[] # Not calculated for month view
)
week_row.append(cell)
current_date += timedelta(days=1)
calendar_grid.append(week_row)
return {
'view_type': 'month',
'month': target_month,
'year': target_year,
'month_name': first_day.strftime('%B'),
'calendar_grid': [[self._cell_to_dict(cell) for cell in week] for week in calendar_grid],
'total_events': len(events),
'theme': config.theme.value
}
def _render_year_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a year view with mini month calendars."""
target_year = config.start_date.year
months_data = []
for month in range(1, 13):
month_start = date(target_year, month, 1)
month_end = date(target_year, month, monthrange(target_year, month)[1])
# Get events and holidays for this month
month_events = [e for e in events if
e.start_time.date() <= month_end and e.end_time.date() >= month_start]
month_holidays = [h for h in holidays if
h.date.month == month and h.date.year == target_year]
# Create mini month configuration
month_config = ViewConfig(
mode=ViewMode.MONTH,
start_date=month_start,
end_date=month_end,
theme=config.theme
)
# Render mini month
mini_month = self._render_mini_month(month_events, month_holidays, month_config)
months_data.append(mini_month)
return {
'view_type': 'year',
'year': target_year,
'months': months_data,
'total_events': len(events),
'total_holidays': len(holidays),
'theme': config.theme.value
}
def _render_agenda_view(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render an agenda/list view of events."""
# Group events by date
events_by_date = {}
current_date = config.start_date
while current_date <= config.end_date:
day_events = [e for e in events if e.start_time.date() <= current_date <= e.end_time.date()]
day_holidays = [h.name for h in holidays if h.date == current_date]
if day_events or day_holidays:
events_by_date[current_date.isoformat()] = {
'date': current_date.isoformat(),
'day_name': current_date.strftime('%A'),
'events': sorted([self._event_to_dict(e) for e in day_events],
key=lambda x: x['start_time']),
'holidays': day_holidays,
'is_today': current_date == date.today()
}
current_date += timedelta(days=1)
return {
'view_type': 'agenda',
'start_date': config.start_date.isoformat(),
'end_date': config.end_date.isoformat(),
'events_by_date': events_by_date,
'theme': config.theme.value
}
def _render_mini_month(self, events: List[EventDisplayInfo], holidays: List[HolidayInfo],
config: ViewConfig) -> Dict[str, Any]:
"""Render a mini month calendar for year view."""
target_month = config.start_date.month
target_year = config.start_date.year
# Simplified month rendering for year view
days_with_events = set()
days_with_holidays = set()
for event in events:
event_start = event.start_time.date()
event_end = event.end_time.date()
current = max(event_start, config.start_date)
end = min(event_end, config.end_date)
while current <= end:
if current.month == target_month:
days_with_events.add(current.day)
current += timedelta(days=1)
for holiday in holidays:
if holiday.date.month == target_month and holiday.date.year == target_year:
days_with_holidays.add(holiday.date.day)
return {
'month': target_month,
'month_name': config.start_date.strftime('%b'),
'year': target_year,
'days_with_events': list(days_with_events),
'days_with_holidays': list(days_with_holidays),
'total_events': len(events)
}
def _generate_time_slots(self, start_time: datetime, end_time: datetime,
interval_minutes: int) -> List[str]:
"""Generate time slot labels for day view."""
slots = []
current = start_time
while current <= end_time:
slots.append(current.strftime('%H:%M'))
current += timedelta(minutes=interval_minutes)
return slots
def _calculate_available_slots(self, events: List[EventDisplayInfo],
target_date: date) -> List[Tuple[time, time]]:
"""Calculate available time slots for a given date."""
# Define business hours
business_start = time(9, 0)
business_end = time(17, 0)
# Get timed events for the date
timed_events = [e for e in events if not e.is_all_day and
e.start_time.date() <= target_date <= e.end_time.date()]
# Sort events by start time
timed_events.sort(key=lambda x: x.start_time.time())
available_slots = []
current_time = business_start
for event in timed_events:
event_start = event.start_time.time()
event_end = event.end_time.time()
# Add slot before event if there's a gap
if current_time < event_start:
available_slots.append((current_time, event_start))
# Move current time to after the event
current_time = max(current_time, event_end)
# Add final slot if there's time left
if current_time < business_end:
available_slots.append((current_time, business_end))
return available_slots
def _calculate_multi_day_spans(self, events: List[EventDisplayInfo],
start_date: date, end_date: date) -> List[Dict[str, Any]]:
"""Calculate how multi-day events span across the view period."""
multi_day_events = []
for event in events:
event_start = max(event.start_time.date(), start_date)
event_end = min(event.end_time.date(), end_date)
if event_start <= event_end and (event.end_time.date() - event.start_time.date()).days > 0:
span_days = (event_end - event_start).days + 1
start_day_offset = (event_start - start_date).days
multi_day_events.append({
'event_id': event.event_id,
'title': event.title,
'color': event.color,
'start_day_offset': start_day_offset,
'span_days': span_days,
'is_all_day': event.is_all_day
})
return multi_day_events
def _get_default_color(self, event_type: EventType) -> str:
"""Get default color for an event type."""
color_map = {
EventType.MEETING: '#3498db',
EventType.APPOINTMENT: '#e74c3c',
EventType.PERSONAL: '#2ecc71',
EventType.WORK: '#9b59b6',
EventType.TASK: '#f39c12',
EventType.REMINDER: '#95a5a6'
}
return color_map.get(event_type, '#34495e')
def _calculate_display_priority(self, event: Event) -> int:
"""Calculate display priority for event ordering."""
priority = 0
# Higher priority for all-day events
if event.all_day:
priority += 100
# Higher priority for longer events
duration_hours = (event.calculated_end_time - event.start_time).total_seconds() / 3600
priority += min(duration_hours * 10, 50)
# Higher priority for certain event types
type_priorities = {
EventType.MEETING: 20,
EventType.APPOINTMENT: 15,
EventType.WORK: 10,
EventType.PERSONAL: 5,
EventType.TASK: 3,
EventType.REMINDER: 1
}
priority += type_priorities.get(event.event_type, 0)
return int(priority)
def _event_to_dict(self, event: EventDisplayInfo) -> Dict[str, Any]:
"""Convert EventDisplayInfo to dictionary for JSON serialization."""
return {
'id': event.event_id,
'title': event.title,
'start_time': event.start_time.isoformat(),
'end_time': event.end_time.isoformat(),
'color': event.color,
'event_type': event.event_type,
'location': event.location,
'is_all_day': event.is_all_day,
'is_recurring': event.is_recurring
}
def _cell_to_dict(self, cell: CalendarCell) -> Dict[str, Any]:
"""Convert CalendarCell to dictionary for JSON serialization."""
return {
'date': cell.date.isoformat(),
'events': [self._event_to_dict(e) for e in cell.events],
'holidays': cell.holidays,
'is_today': cell.is_today,
'is_weekend': cell.is_weekend,
'is_other_month': cell.is_other_month,
'event_count': len(cell.events)
}
def _initialize_color_palette(self) -> Dict[str, List[str]]:
"""Initialize color palettes for different themes."""
return {
'light': [
'#3498db', '#e74c3c', '#2ecc71', '#f39c12', '#9b59b6',
'#1abc9c', '#e67e22', '#34495e', '#f1c40f', '#e91e63'
],
'dark': [
'#5dade2', '#ec7063', '#58d68d', '#f7dc6f', '#bb8fce',
'#76d7c4', '#f8c471', '#85929e', '#f4d03f', '#f1948a'
]
}
class LayoutCalculator:
def calculate_day_layout(self, events: List[EventDisplayInfo],
time_slots: List[str]) -> List[Dict[str, Any]]:
"""Calculate layout for events in day view to avoid overlaps."""
# Sort events by start time
sorted_events = sorted(events, key=lambda x: x.start_time)
# Calculate event positions and widths
event_layout = []
columns = [] # Track occupied columns
for event in sorted_events:
# Find available column
column = 0
while column < len(columns):
if columns[column] <= event.start_time:
break
column += 1
# Add new column if needed
if column >= len(columns):
columns.append(event.end_time)
else:
columns[column] = event.end_time
# Calculate position and size
start_minutes = event.start_time.hour * 60 + event.start_time.minute
duration_minutes = (event.end_time - event.start_time).total_seconds() / 60
event_layout.append({
'event': self._event_to_dict(event),
'column': column,
'total_columns': len(columns),
'top_position': start_minutes,
'height': duration_minutes,
'width_percent': 100 / len(columns)
})
return event_layout
def _event_to_dict(self, event: EventDisplayInfo) -> Dict[str, Any]:
"""Convert EventDisplayInfo to dictionary."""
return {
'id': event.event_id,
'title': event.title,
'start_time': event.start_time.isoformat(),
'end_time': event.end_time.isoformat(),
'color': event.color,
'location': event.location
}
class DisplayConflictDetector:
def resolve_display_conflicts(self, events: List[EventDisplayInfo]) -> List[EventDisplayInfo]:
"""Resolve display conflicts between overlapping events."""
# Sort events by priority
events.sort(key=lambda x: x.display_priority, reverse=True)
# Mark conflicts
for i, event1 in enumerate(events):
for j, event2 in enumerate(events[i+1:], i+1):
if self._events_overlap(event1, event2):
event2.conflict_level += 1
return events
def _events_overlap(self, event1: EventDisplayInfo, event2: EventDisplayInfo) -> bool:
"""Check if two events overlap in time."""
return (event1.start_time < event2.end_time and
event1.end_time > event2.start_time)
This calendar display and visualization system provides comprehensive support for rendering calendar data across multiple view modes while maintaining visual consistency and usability. The CalendarRenderer handles the complex task of transforming calendar data into appropriate visual representations, while the LayoutCalculator ensures that overlapping events are displayed clearly without conflicts. The system supports responsive design principles and user customization options, making it suitable for deployment across different devices and user preferences.
User Interface and Experience Design
The user interface and experience design for an LLM-based calendar application requires careful balance between the power of natural language interaction and the familiarity of traditional calendar interfaces. Users need to feel confident that they can accomplish their tasks efficiently while discovering the enhanced capabilities that natural language processing provides. The interface must support both conversational interactions and direct manipulation, allowing users to choose their preferred interaction method based on context and personal preference.
The conversational interface presents unique design challenges because users must understand what types of commands the system can process and receive clear feedback about the results of their requests. Unlike traditional graphical interfaces where available actions are visible through buttons and menus, natural language interfaces require users to discover capabilities through experimentation or guidance. The system must provide helpful prompts, examples, and error recovery mechanisms to support user learning and confidence.
Visual feedback and confirmation mechanisms become particularly important in a natural language calendar system. When a user says "Schedule a meeting with Sarah next Tuesday," the system must clearly show what was understood and created, allowing for easy correction if the interpretation was incorrect. The interface must also handle ambiguous requests gracefully, presenting clarification options rather than making assumptions that might lead to incorrect calendar entries.
Here is a comprehensive implementation of the user interface and experience design system. This example demonstrates how natural language interactions are integrated with traditional calendar interfaces, including conversation management, visual feedback systems, and responsive design patterns. The implementation shows how users can seamlessly switch between conversational and direct manipulation modes.
from datetime import datetime, date, timedelta
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import asyncio
class InteractionMode(Enum):
CONVERSATION = "conversation"
DIRECT_MANIPULATION = "direct_manipulation"
HYBRID = "hybrid"
class FeedbackType(Enum):
SUCCESS = "success"
ERROR = "error"
WARNING = "warning"
INFO = "info"
CONFIRMATION = "confirmation"
@dataclass
class UserInterfaceState:
current_view: ViewMode
selected_date: date
interaction_mode: InteractionMode
conversation_active: bool
pending_confirmations: List[str] = field(default_factory=list)
user_preferences: Dict[str, Any] = field(default_factory=dict)
accessibility_settings: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ConversationTurn:
user_input: str
system_response: str
timestamp: datetime
intent: Optional[Dict[str, Any]] = None
result: Optional[Dict[str, Any]] = None
requires_confirmation: bool = False
@dataclass
class UIFeedback:
message: str
feedback_type: FeedbackType
duration_ms: int = 5000
actions: List[Dict[str, Any]] = field(default_factory=list)
data: Optional[Dict[str, Any]] = None
class ConversationManager:
def __init__(self, llm_service, calendar_engine):
self.llm_service = llm_service
self.calendar_engine = calendar_engine
self.conversation_history = []
self.context_window = 10 # Keep last 10 turns for context
async def process_user_input(self, user_input: str, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Process user input and return response with UI updates."""
# Create conversation turn
turn = ConversationTurn(
user_input=user_input,
system_response="",
timestamp=datetime.now()
)
try:
# Parse intent with conversation context
intent = await self.llm_service.parse_intent(
user_input,
self._build_conversation_context()
)
turn.intent = intent
# Handle special conversation commands
if self._is_conversation_command(user_input):
response = await self._handle_conversation_command(user_input, ui_state)
turn.system_response = response['message']
return response
# Execute calendar command
result = await self.calendar_engine.execute_command(intent)
turn.result = result.__dict__
# Generate natural language response
response = await self._generate_response(result, intent, ui_state)
turn.system_response = response['message']
# Add to conversation history
self.conversation_history.append(turn)
self._trim_conversation_history()
return response
except Exception as e:
error_response = {
'message': f"I'm sorry, I encountered an error: {str(e)}",
'feedback': UIFeedback(
message="Something went wrong. Please try again.",
feedback_type=FeedbackType.ERROR
),
'suggestions': [
"Try rephrasing your request",
"Use simpler language",
"Check if all required information is provided"
]
}
turn.system_response = error_response['message']
self.conversation_history.append(turn)
return error_response
async def _generate_response(self, result, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Generate comprehensive response with UI updates."""
response = {
'message': '',
'feedback': None,
'ui_updates': {},
'suggestions': [],
'requires_confirmation': False
}
if result.success:
response.update(await self._handle_successful_result(result, intent, ui_state))
else:
response.update(await self._handle_failed_result(result, intent, ui_state))
# Add contextual suggestions
response['suggestions'] = await self._generate_contextual_suggestions(intent, ui_state)
return response
async def _handle_successful_result(self, result, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Handle successful calendar operations."""
action = intent.get('action')
if action == 'create':
return await self._handle_create_success(result, intent, ui_state)
elif action == 'update':
return await self._handle_update_success(result, intent, ui_state)
elif action == 'delete':
return await self._handle_delete_success(result, intent, ui_state)
elif action == 'query':
return await self._handle_query_success(result, intent, ui_state)
elif action == 'find_slots':
return await self._handle_find_slots_success(result, intent, ui_state)
else:
return {
'message': "Operation completed successfully.",
'feedback': UIFeedback(
message="Done!",
feedback_type=FeedbackType.SUCCESS
)
}
async def _handle_create_success(self, result, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Handle successful event creation."""
event = result.data
event_time = event.start_time.strftime('%A, %B %d at %I:%M %p')
message = f"Perfect! I've scheduled '{event.title}' for {event_time}"
if event.location:
message += f" at {event.location}"
message += "."
# Prepare UI updates
ui_updates = {
'highlight_event': str(event.id),
'navigate_to_date': event.start_time.date().isoformat(),
'refresh_view': True
}
# Create feedback with action options
feedback = UIFeedback(
message="Event created successfully!",
feedback_type=FeedbackType.SUCCESS,
actions=[
{'label': 'View Event', 'action': 'view_event', 'event_id': str(event.id)},
{'label': 'Add Reminder', 'action': 'add_reminder', 'event_id': str(event.id)},
{'label': 'Invite Others', 'action': 'invite_participants', 'event_id': str(event.id)}
]
)
return {
'message': message,
'feedback': feedback,
'ui_updates': ui_updates
}
async def _handle_query_success(self, result, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Handle successful event queries."""
events = result.data
if not events:
message = "You don't have any events scheduled for that time."
feedback = UIFeedback(
message="No events found",
feedback_type=FeedbackType.INFO
)
else:
event_count = len(events)
message = f"You have {event_count} event{'s' if event_count != 1 else ''}: "
event_summaries = []
for event in events[:3]: # Show first 3 events
time_str = event.start_time.strftime('%I:%M %p')
event_summaries.append(f"'{event.title}' at {time_str}")
message += ", ".join(event_summaries)
if event_count > 3:
message += f" and {event_count - 3} more."
feedback = UIFeedback(
message=f"Found {event_count} events",
feedback_type=FeedbackType.SUCCESS,
actions=[
{'label': 'View All', 'action': 'show_agenda_view'},
{'label': 'Export List', 'action': 'export_events'}
]
)
ui_updates = {
'show_event_list': [self._event_to_summary(e) for e in events],
'refresh_view': True
}
return {
'message': message,
'feedback': feedback,
'ui_updates': ui_updates
}
async def _handle_find_slots_success(self, result, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Handle successful free slot finding."""
free_slots = result.data
if not free_slots:
message = "I couldn't find any available time slots in the specified period. Would you like me to check a different time range?"
feedback = UIFeedback(
message="No available slots found",
feedback_type=FeedbackType.WARNING,
actions=[
{'label': 'Try Different Dates', 'action': 'suggest_alternative_dates'},
{'label': 'Show Conflicts', 'action': 'show_conflicting_events'}
]
)
else:
slot_count = len(free_slots)
message = f"I found {slot_count} available time slot{'s' if slot_count != 1 else ''}. "
# Show first few slots
slot_summaries = []
for slot in free_slots[:3]:
start_time = slot['start'].strftime('%A %I:%M %p')
slot_summaries.append(start_time)
message += "Here are some options: " + ", ".join(slot_summaries)
if slot_count > 3:
message += f" and {slot_count - 3} more."
feedback = UIFeedback(
message=f"Found {slot_count} available slots",
feedback_type=FeedbackType.SUCCESS,
actions=[
{'label': 'Schedule Meeting', 'action': 'create_event_in_slot'},
{'label': 'View All Slots', 'action': 'show_all_slots'}
]
)
ui_updates = {
'highlight_free_slots': free_slots,
'show_availability_overlay': True
}
return {
'message': message,
'feedback': feedback,
'ui_updates': ui_updates
}
def _is_conversation_command(self, user_input: str) -> bool:
"""Check if input is a conversation management command."""
conversation_commands = [
'help', 'what can you do', 'how do i', 'explain', 'tutorial',
'clear', 'reset', 'start over', 'nevermind', 'cancel'
]
return any(cmd in user_input.lower() for cmd in conversation_commands)
async def _handle_conversation_command(self, user_input: str,
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Handle conversation management commands."""
lower_input = user_input.lower()
if any(word in lower_input for word in ['help', 'what can you do']):
return await self._provide_help(ui_state)
elif any(word in lower_input for word in ['clear', 'reset', 'start over']):
return await self._reset_conversation()
elif any(word in lower_input for word in ['nevermind', 'cancel']):
return await self._cancel_operation(ui_state)
else:
return {
'message': "I'm not sure how to help with that. Try asking 'What can you do?' for available commands.",
'feedback': UIFeedback(
message="Unknown command",
feedback_type=FeedbackType.INFO
)
}
async def _provide_help(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Provide help information and examples."""
help_message = """I can help you manage your calendar using natural language. Here are some things you can ask me:
📅 **Creating Events:**
- "Schedule a meeting with John tomorrow at 2 PM"
- "Book a dentist appointment next Friday morning"
- "Set up a weekly team meeting every Tuesday at 10 AM"
🔍 **Finding Information:**
- "What do I have scheduled for today?"
- "Show me next week's meetings"
- "When am I free this afternoon?"
✏️ **Modifying Events:**
- "Move my 3 PM meeting to 4 PM"
- "Cancel tomorrow's lunch appointment"
- "Add Sarah to the project meeting"
Just speak naturally - I'll understand what you want to do!"""
examples = [
"Schedule a meeting with the marketing team next Monday at 10 AM",
"What meetings do I have this week?",
"Find me a free hour tomorrow afternoon",
"Cancel my 2 PM appointment",
"Move Friday's meeting to next week"
]
return {
'message': help_message,
'feedback': UIFeedback(
message="Here's what I can help you with",
feedback_type=FeedbackType.INFO,
duration_ms=10000
),
'ui_updates': {
'show_help_panel': True,
'example_commands': examples
}
}
def _build_conversation_context(self) -> Dict[str, Any]:
"""Build context from recent conversation history."""
context = {
'recent_turns': [],
'mentioned_entities': set(),
'pending_operations': []
}
# Get recent conversation turns
recent_turns = self.conversation_history[-self.context_window:]
for turn in recent_turns:
context['recent_turns'].append({
'user_input': turn.user_input,
'system_response': turn.system_response,
'intent': turn.intent
})
# Extract mentioned entities
if turn.intent and 'parameters' in turn.intent:
entities = turn.intent['parameters'].get('entities', {})
for entity_type, entity_list in entities.items():
context['mentioned_entities'].update(entity_list)
return context
def _event_to_summary(self, event) -> Dict[str, Any]:
"""Convert event to summary format for UI display."""
return {
'id': str(event.id),
'title': event.title,
'start_time': event.start_time.isoformat(),
'end_time': event.calculated_end_time.isoformat(),
'location': event.location,
'type': event.event_type.value
}
def _trim_conversation_history(self):
"""Keep conversation history within reasonable bounds."""
max_history = 50
if len(self.conversation_history) > max_history:
self.conversation_history = self.conversation_history[-max_history:]
async def _generate_contextual_suggestions(self, intent: Dict[str, Any],
ui_state: UserInterfaceState) -> List[str]:
"""Generate contextual suggestions based on current state."""
suggestions = []
action = intent.get('action')
if action == 'create':
suggestions.extend([
"Add a reminder to this event",
"Invite participants to the meeting",
"Set up a recurring schedule"
])
elif action == 'query':
suggestions.extend([
"Find available time slots",
"Create a new event",
"Export this schedule"
])
elif action == 'find_slots':
suggestions.extend([
"Schedule a meeting in one of these slots",
"Check availability for different dates",
"See what's causing conflicts"
])
# Add general suggestions based on current view
if ui_state.current_view == ViewMode.DAY:
suggestions.append("Show me the week view")
elif ui_state.current_view == ViewMode.WEEK:
suggestions.append("Focus on today's schedule")
return suggestions[:3] # Limit to 3 suggestions
class UIComponentManager:
def __init__(self):
self.active_components = {}
self.component_state = {}
def render_conversation_interface(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Render the conversation interface component."""
return {
'type': 'conversation_interface',
'properties': {
'placeholder': self._get_input_placeholder(ui_state),
'suggestions': self._get_quick_suggestions(ui_state),
'voice_enabled': ui_state.user_preferences.get('voice_input', False),
'conversation_active': ui_state.conversation_active,
'accessibility': {
'screen_reader_enabled': ui_state.accessibility_settings.get('screen_reader', False),
'high_contrast': ui_state.accessibility_settings.get('high_contrast', False),
'large_text': ui_state.accessibility_settings.get('large_text', False)
}
}
}
def render_calendar_view(self, calendar_data: Dict[str, Any],
ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Render the calendar view component."""
return {
'type': 'calendar_view',
'properties': {
'view_mode': ui_state.current_view.value,
'selected_date': ui_state.selected_date.isoformat(),
'calendar_data': calendar_data,
'interaction_mode': ui_state.interaction_mode.value,
'theme': ui_state.user_preferences.get('theme', 'light'),
'animations_enabled': ui_state.user_preferences.get('animations', True)
}
}
def render_feedback_system(self, feedback: UIFeedback) -> Dict[str, Any]:
"""Render feedback notifications and confirmations."""
return {
'type': 'feedback_notification',
'properties': {
'message': feedback.message,
'type': feedback.feedback_type.value,
'duration': feedback.duration_ms,
'actions': feedback.actions,
'dismissible': True,
'position': 'top-right'
}
}
def _get_input_placeholder(self, ui_state: UserInterfaceState) -> str:
"""Get contextual placeholder text for input field."""
if ui_state.current_view == ViewMode.DAY:
return "Try: 'Schedule a meeting at 2 PM' or 'What do I have today?'"
elif ui_state.current_view == ViewMode.WEEK:
return "Try: 'Show me free time this week' or 'Move Monday's meeting'"
elif ui_state.current_view == ViewMode.MONTH:
return "Try: 'What's happening next Friday?' or 'Block out vacation time'"
else:
return "Ask me about your calendar - I'm here to help!"
def _get_quick_suggestions(self, ui_state: UserInterfaceState) -> List[str]:
"""Get quick suggestion buttons based on current context."""
base_suggestions = [
"What's on my schedule today?",
"Find me a free hour",
"Schedule a meeting"
]
# Add contextual suggestions based on current view
if ui_state.current_view == ViewMode.DAY:
base_suggestions.append("Show me tomorrow")
elif ui_state.current_view == ViewMode.WEEK:
base_suggestions.append("Go to next week")
return base_suggestions
class AccessibilityManager:
def __init__(self):
self.screen_reader_support = True
self.keyboard_navigation = True
def apply_accessibility_settings(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Apply accessibility settings to UI components."""
settings = ui_state.accessibility_settings
return {
'aria_labels': self._generate_aria_labels(ui_state),
'keyboard_shortcuts': self._get_keyboard_shortcuts(),
'focus_management': self._get_focus_management_rules(),
'screen_reader_announcements': self._get_screen_reader_text(ui_state),
'high_contrast_mode': settings.get('high_contrast', False),
'reduced_motion': settings.get('reduced_motion', False),
'large_text_mode': settings.get('large_text', False)
}
def _generate_aria_labels(self, ui_state: UserInterfaceState) -> Dict[str, str]:
"""Generate ARIA labels for UI elements."""
return {
'conversation_input': 'Enter calendar command or question',
'calendar_grid': f'Calendar view for {ui_state.selected_date.strftime("%B %Y")}',
'event_list': 'List of scheduled events',
'navigation_buttons': 'Calendar navigation controls',
'view_mode_selector': 'Change calendar view mode'
}
def _get_keyboard_shortcuts(self) -> Dict[str, str]:
"""Define keyboard shortcuts for accessibility."""
return {
'ctrl+n': 'Create new event',
'ctrl+f': 'Find events',
'ctrl+t': 'Go to today',
'ctrl+d': 'Day view',
'ctrl+w': 'Week view',
'ctrl+m': 'Month view',
'ctrl+y': 'Year view',
'escape': 'Cancel current operation',
'enter': 'Confirm action',
'tab': 'Navigate to next element',
'shift+tab': 'Navigate to previous element'
}
def _get_focus_management_rules(self) -> Dict[str, Any]:
"""Define focus management rules for keyboard navigation."""
return {
'initial_focus': 'conversation_input',
'focus_trap_modals': True,
'focus_visible_indicators': True,
'skip_links': [
{'target': 'main_content', 'label': 'Skip to main content'},
{'target': 'calendar_view', 'label': 'Skip to calendar'},
{'target': 'conversation_input', 'label': 'Skip to conversation input'}
]
}
def _get_screen_reader_text(self, ui_state: UserInterfaceState) -> Dict[str, str]:
"""Generate screen reader announcements."""
return {
'view_changed': f"Calendar view changed to {ui_state.current_view.value}",
'date_changed': f"Selected date changed to {ui_state.selected_date.strftime('%A, %B %d, %Y')}",
'event_created': "New event created successfully",
'event_updated': "Event updated successfully",
'event_deleted': "Event deleted successfully",
'search_results': "Search results updated"
}
class ResponsiveDesignManager:
def __init__(self):
self.breakpoints = {
'mobile': 768,
'tablet': 1024,
'desktop': 1440
}
def get_responsive_layout(self, screen_width: int, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Get responsive layout configuration based on screen size."""
if screen_width < self.breakpoints['mobile']:
return self._get_mobile_layout(ui_state)
elif screen_width < self.breakpoints['tablet']:
return self._get_tablet_layout(ui_state)
else:
return self._get_desktop_layout(ui_state)
def _get_mobile_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Mobile-optimized layout configuration."""
return {
'layout_type': 'mobile',
'conversation_interface': {
'position': 'bottom',
'full_width': True,
'voice_button_prominent': True
},
'calendar_view': {
'default_view': ViewMode.DAY.value,
'compact_events': True,
'swipe_navigation': True
},
'navigation': {
'type': 'bottom_tabs',
'items': ['Day', 'Week', 'Month', 'Chat']
}
}
def _get_tablet_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Tablet-optimized layout configuration."""
return {
'layout_type': 'tablet',
'conversation_interface': {
'position': 'sidebar',
'width': '30%',
'collapsible': True
},
'calendar_view': {
'default_view': ViewMode.WEEK.value,
'show_mini_calendar': True,
'touch_optimized': True
},
'navigation': {
'type': 'top_bar',
'show_view_switcher': True
}
}
def _get_desktop_layout(self, ui_state: UserInterfaceState) -> Dict[str, Any]:
"""Desktop-optimized layout configuration."""
return {
'layout_type': 'desktop',
'conversation_interface': {
'position': 'floating',
'width': '400px',
'resizable': True,
'keyboard_shortcuts': True
},
'calendar_view': {
'default_view': ViewMode.MONTH.value,
'show_sidebar': True,
'multi_calendar_support': True
},
'navigation': {
'type': 'full_menu',
'show_all_options': True,
'keyboard_navigation': True
}
}
This user interface and experience design system provides a comprehensive framework for creating an intuitive and accessible LLM-based calendar application. The ConversationManager handles the complex task of maintaining conversational context while providing clear feedback and suggestions. The UIComponentManager ensures consistent rendering across different interface elements, while the AccessibilityManager guarantees that the application is usable by people with diverse abilities. The ResponsiveDesignManager adapts the interface to different screen sizes and interaction patterns, ensuring a seamless experience across all devices.
Advanced Features Implementation
The advanced features of an LLM-based calendar application extend beyond basic scheduling to provide intelligent automation, predictive capabilities, and seamless integration with external systems. These features leverage the natural language processing capabilities to offer sophisticated functionality that would be difficult to implement in traditional calendar applications. The system can learn from user patterns, provide proactive suggestions, and automate routine scheduling tasks while maintaining user control and transparency.
Intelligent scheduling represents one of the most valuable advanced features, where the system can automatically suggest optimal meeting times based on participant availability, preferences, and historical patterns. The system analyzes factors such as time zones, working hours, meeting frequency, and travel time between locations to propose the best possible scheduling options. This feature requires sophisticated algorithms that balance multiple constraints while providing explanations for scheduling decisions.
Integration capabilities allow the calendar to connect with external systems such as email, video conferencing platforms, project management tools, and travel booking services. These integrations enable automatic event creation from email invitations, seamless video conference setup, and intelligent travel time calculations. The natural language interface makes these integrations more powerful by allowing users to request complex operations that span multiple systems through simple conversational commands.
Here is a comprehensive implementation of advanced features that demonstrates intelligent scheduling, external integrations, and predictive capabilities. This example shows how the system can automatically optimize schedules, integrate with external services, and provide proactive assistance to users. The implementation includes machine learning components for pattern recognition and decision-making algorithms for complex scheduling scenarios.
import asyncio
from datetime import datetime, timedelta, time
from typing import List, Dict, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
import json
import numpy as np
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class SchedulingStrategy(Enum):
OPTIMIZE_FOR_FOCUS = "optimize_for_focus"
MINIMIZE_CONFLICTS = "minimize_conflicts"
BALANCE_WORKLOAD = "balance_workload"
RESPECT_PREFERENCES = "respect_preferences"
MINIMIZE_TRAVEL = "minimize_travel"
class IntegrationType(Enum):
EMAIL = "email"
VIDEO_CONFERENCE = "video_conference"
PROJECT_MANAGEMENT = "project_management"
TRAVEL_BOOKING = "travel_booking"
WEATHER_SERVICE = "weather_service"
CONTACT_MANAGEMENT = "contact_management"
@dataclass
class UserPreferences:
preferred_meeting_times: List[Tuple[time, time]]
maximum_daily_meetings: int
minimum_break_between_meetings: int # minutes
preferred_meeting_duration: int # minutes
work_hours: Tuple[time, time]
time_zone: str
focus_time_blocks: List[Tuple[time, time]]
travel_buffer_time: int # minutes
notification_preferences: Dict[str, Any]
@dataclass
class SchedulingConstraint:
constraint_type: str
priority: int # 1-10, 10 being highest
description: str
parameters: Dict[str, Any]
@dataclass
class SchedulingSuggestion:
suggested_time: datetime
confidence_score: float
reasoning: str
alternative_times: List[datetime]
potential_conflicts: List[str]
optimization_factors: Dict[str, float]
class IntelligentScheduler:
def __init__(self, calendar_engine, user_preferences: UserPreferences):
self.calendar_engine = calendar_engine
self.user_preferences = user_preferences
self.pattern_analyzer = SchedulingPatternAnalyzer()
self.constraint_solver = ConstraintSolver()
self.optimization_engine = SchedulingOptimizer()
async def suggest_optimal_meeting_time(self, meeting_requirements: Dict[str, Any],
participants: List[str] = None) -> SchedulingSuggestion:
"""Suggest optimal meeting time based on multiple factors."""
# Analyze current schedule patterns
patterns = await self.pattern_analyzer.analyze_user_patterns(
meeting_requirements.get('user_id')
)
# Get participant availability if provided
participant_availability = {}
if participants:
for participant in participants:
availability = await self._get_participant_availability(participant)
participant_availability[participant] = availability
# Define scheduling constraints
constraints = self._build_scheduling_constraints(
meeting_requirements, participant_availability, patterns
)
# Find candidate time slots
candidate_slots = await self._find_candidate_time_slots(
meeting_requirements, constraints
)
# Optimize and rank suggestions
optimized_suggestions = await self.optimization_engine.optimize_suggestions(
candidate_slots, constraints, patterns
)
if optimized_suggestions:
best_suggestion = optimized_suggestions[0]
return SchedulingSuggestion(
suggested_time=best_suggestion['time'],
confidence_score=best_suggestion['score'],
reasoning=best_suggestion['reasoning'],
alternative_times=[s['time'] for s in optimized_suggestions[1:4]],
potential_conflicts=best_suggestion.get('conflicts', []),
optimization_factors=best_suggestion.get('factors', {})
)
else:
# Fallback to basic scheduling
return await self._fallback_scheduling(meeting_requirements)
async def auto_schedule_recurring_meetings(self, series_requirements: Dict[str, Any]) -> List[SchedulingSuggestion]:
"""Automatically schedule a series of recurring meetings."""
suggestions = []
current_date = datetime.now().date()
end_date = series_requirements.get('end_date', current_date + timedelta(days=90))
recurrence_pattern = series_requirements.get('recurrence', {})
interval_days = self._calculate_recurrence_interval(recurrence_pattern)
while current_date <= end_date:
meeting_req = series_requirements.copy()
meeting_req['preferred_date'] = current_date
suggestion = await self.suggest_optimal_meeting_time(meeting_req)
if suggestion.confidence_score > 0.7: # Only suggest high-confidence slots
suggestions.append(suggestion)
current_date += timedelta(days=interval_days)
return suggestions
async def detect_scheduling_conflicts(self, user_id: str,
time_range: Tuple[datetime, datetime]) -> List[Dict[str, Any]]:
"""Detect potential scheduling conflicts and suggest resolutions."""
start_time, end_time = time_range
events = await self.calendar_engine.get_events_in_range(user_id, start_time, end_time)
conflicts = []
# Check for overlapping events
for i, event1 in enumerate(events):
for event2 in events[i+1:]:
if self._events_overlap(event1, event2):
conflict = {
'type': 'overlap',
'events': [event1.id, event2.id],
'severity': self._calculate_conflict_severity(event1, event2),
'suggestions': await self._generate_conflict_resolutions(event1, event2)
}
conflicts.append(conflict)
# Check for preference violations
preference_conflicts = await self._check_preference_violations(events)
conflicts.extend(preference_conflicts)
# Check for workload issues
workload_conflicts = await self._check_workload_balance(events)
conflicts.extend(workload_conflicts)
return conflicts
def _build_scheduling_constraints(self, requirements: Dict[str, Any],
participant_availability: Dict[str, Any],
patterns: Dict[str, Any]) -> List[SchedulingConstraint]:
"""Build comprehensive scheduling constraints."""
constraints = []
# Time-based constraints
if 'preferred_time' in requirements:
constraints.append(SchedulingConstraint(
constraint_type='preferred_time',
priority=8,
description='User preferred time',
parameters={'time': requirements['preferred_time']}
))
# Work hours constraint
constraints.append(SchedulingConstraint(
constraint_type='work_hours',
priority=9,
description='Within work hours',
parameters={
'start': self.user_preferences.work_hours[0],
'end': self.user_preferences.work_hours[1]
}
))
# Meeting frequency constraint
if patterns.get('meeting_frequency'):
constraints.append(SchedulingConstraint(
constraint_type='meeting_frequency',
priority=6,
description='Respect meeting frequency patterns',
parameters={'max_daily': self.user_preferences.maximum_daily_meetings}
))
# Participant availability constraints
for participant, availability in participant_availability.items():
constraints.append(SchedulingConstraint(
constraint_type='participant_availability',
priority=10,
description=f'Participant {participant} availability',
parameters={'participant': participant, 'availability': availability}
))
# Focus time protection
if self.user_preferences.focus_time_blocks:
constraints.append(SchedulingConstraint(
constraint_type='focus_time_protection',
priority=7,
description='Protect focus time blocks',
parameters={'focus_blocks': self.user_preferences.focus_time_blocks}
))
return constraints
async def _find_candidate_time_slots(self, requirements: Dict[str, Any],
constraints: List[SchedulingConstraint]) -> List[datetime]:
"""Find candidate time slots that satisfy basic constraints."""
candidates = []
duration = requirements.get('duration', self.user_preferences.preferred_meeting_duration)
# Start from preferred date or tomorrow
start_date = requirements.get('preferred_date', datetime.now().date() + timedelta(days=1))
search_period = timedelta(days=requirements.get('search_days', 14))
current_datetime = datetime.combine(start_date, self.user_preferences.work_hours[0])
end_search = datetime.combine(start_date + search_period, self.user_preferences.work_hours[1])
while current_datetime < end_search:
slot_end = current_datetime + timedelta(minutes=duration)
# Check if slot satisfies all hard constraints
if await self._slot_satisfies_constraints(current_datetime, slot_end, constraints):
candidates.append(current_datetime)
# Move to next 30-minute slot
current_datetime += timedelta(minutes=30)
# Skip to next day if past work hours
if current_datetime.time() > self.user_preferences.work_hours[1]:
next_day = current_datetime.date() + timedelta(days=1)
current_datetime = datetime.combine(next_day, self.user_preferences.work_hours[0])
return candidates
async def _slot_satisfies_constraints(self, start_time: datetime, end_time: datetime,
constraints: List[SchedulingConstraint]) -> bool:
"""Check if a time slot satisfies all hard constraints."""
for constraint in constraints:
if constraint.priority >= 9: # Hard constraints
if not await self._check_constraint(start_time, end_time, constraint):
return False
return True
async def _check_constraint(self, start_time: datetime, end_time: datetime,
constraint: SchedulingConstraint) -> bool:
"""Check if a specific constraint is satisfied."""
if constraint.constraint_type == 'work_hours':
work_start = constraint.parameters['start']
work_end = constraint.parameters['end']
return work_start <= start_time.time() <= work_end and work_start <= end_time.time() <= work_end
elif constraint.constraint_type == 'participant_availability':
# Check participant availability (simplified)
availability = constraint.parameters['availability']
return self._time_in_availability(start_time, end_time, availability)
elif constraint.constraint_type == 'focus_time_protection':
focus_blocks = constraint.parameters['focus_blocks']
return not self._overlaps_focus_time(start_time, end_time, focus_blocks)
return True
def _time_in_availability(self, start_time: datetime, end_time: datetime,
availability: List[Tuple[datetime, datetime]]) -> bool:
"""Check if time slot is within participant availability."""
for avail_start, avail_end in availability:
if avail_start <= start_time and end_time <= avail_end:
return True
return False
def _overlaps_focus_time(self, start_time: datetime, end_time: datetime,
focus_blocks: List[Tuple[time, time]]) -> bool:
"""Check if time slot overlaps with focus time blocks."""
for focus_start, focus_end in focus_blocks:
if (start_time.time() < focus_end and end_time.time() > focus_start):
return True
return False
class SchedulingPatternAnalyzer:
def __init__(self):
self.pattern_cache = {}
async def analyze_user_patterns(self, user_id: str) -> Dict[str, Any]:
"""Analyze user scheduling patterns for intelligent suggestions."""
if user_id in self.pattern_cache:
return self.pattern_cache[user_id]
# Get historical events (last 90 days)
end_date = datetime.now()
start_date = end_date - timedelta(days=90)
# This would typically query the database for historical events
historical_events = await self._get_historical_events(user_id, start_date, end_date)
patterns = {
'preferred_times': self._analyze_preferred_times(historical_events),
'meeting_frequency': self._analyze_meeting_frequency(historical_events),
'duration_preferences': self._analyze_duration_preferences(historical_events),
'day_of_week_preferences': self._analyze_day_preferences(historical_events),
'seasonal_patterns': self._analyze_seasonal_patterns(historical_events),
'productivity_patterns': self._analyze_productivity_patterns(historical_events)
}
self.pattern_cache[user_id] = patterns
return patterns
def _analyze_preferred_times(self, events: List[Event]) -> Dict[str, Any]:
"""Analyze preferred meeting times from historical data."""
meeting_times = []
for event in events:
if event.event_type == EventType.MEETING:
meeting_times.append(event.start_time.hour + event.start_time.minute / 60.0)
if not meeting_times:
return {'peak_hours': [], 'confidence': 0.0}
# Use clustering to find preferred time clusters
times_array = np.array(meeting_times).reshape(-1, 1)
# Determine optimal number of clusters (max 3 for morning, afternoon, late)
n_clusters = min(3, len(set(meeting_times)))
if n_clusters > 1:
kmeans = KMeans(n_clusters=n_clusters, random_state=42)
clusters = kmeans.fit_predict(times_array)
# Find cluster centers and sizes
cluster_info = []
for i in range(n_clusters):
cluster_times = [meeting_times[j] for j, c in enumerate(clusters) if c == i]
cluster_info.append({
'center_hour': np.mean(cluster_times),
'size': len(cluster_times),
'std': np.std(cluster_times)
})
# Sort by cluster size (most frequent first)
cluster_info.sort(key=lambda x: x['size'], reverse=True)
return {
'peak_hours': [info['center_hour'] for info in cluster_info],
'confidence': max(cluster_info[0]['size'] / len(meeting_times), 0.3)
}
return {'peak_hours': [np.mean(meeting_times)], 'confidence': 0.5}
def _analyze_meeting_frequency(self, events: List[Event]) -> Dict[str, Any]:
"""Analyze meeting frequency patterns."""
meetings_by_day = {}
for event in events:
if event.event_type == EventType.MEETING:
day_key = event.start_time.date()
meetings_by_day[day_key] = meetings_by_day.get(day_key, 0) + 1
if not meetings_by_day:
return {'average_daily': 0, 'max_daily': 0, 'distribution': []}
daily_counts = list(meetings_by_day.values())
return {
'average_daily': np.mean(daily_counts),
'max_daily': max(daily_counts),
'distribution': daily_counts,
'busy_days_threshold': np.percentile(daily_counts, 75)
}
async def _get_historical_events(self, user_id: str, start_date: datetime,
end_date: datetime) -> List[Event]:
"""Get historical events for pattern analysis."""
# This would typically query the database
# For now, return empty list as placeholder
return []
class ExternalIntegrationManager:
def __init__(self):
self.integrations = {}
self.api_clients = {}
async def setup_integration(self, integration_type: IntegrationType,
config: Dict[str, Any]) -> bool:
"""Setup an external integration."""
try:
if integration_type == IntegrationType.EMAIL:
return await self._setup_email_integration(config)
elif integration_type == IntegrationType.VIDEO_CONFERENCE:
return await self._setup_video_conference_integration(config)
elif integration_type == IntegrationType.PROJECT_MANAGEMENT:
return await self._setup_project_management_integration(config)
elif integration_type == IntegrationType.TRAVEL_BOOKING:
return await self._setup_travel_integration(config)
else:
return False
except Exception as e:
print(f"Error setting up {integration_type.value} integration: {e}")
return False
async def _setup_email_integration(self, config: Dict[str, Any]) -> bool:
"""Setup email integration for automatic event creation."""
email_config = {
'smtp_server': config.get('smtp_server'),
'smtp_port': config.get('smtp_port', 587),
'username': config.get('username'),
'password': config.get('password'),
'imap_server': config.get('imap_server'),
'imap_port': config.get('imap_port', 993)
}
# Test connection
try:
import imaplib
mail = imaplib.IMAP4_SSL(email_config['imap_server'], email_config['imap_port'])
mail.login(email_config['username'], email_config['password'])
mail.logout()
self.integrations[IntegrationType.EMAIL] = email_config
return True
except Exception as e:
print(f"Email integration test failed: {e}")
return False
async def _setup_video_conference_integration(self, config: Dict[str, Any]) -> bool:
"""Setup video conferencing integration."""
platform = config.get('platform', 'zoom')
if platform == 'zoom':
zoom_config = {
'api_key': config.get('api_key'),
'api_secret': config.get('api_secret'),
'account_id': config.get('account_id')
}
# Test Zoom API connection
try:
# This would test the actual Zoom API
self.integrations[IntegrationType.VIDEO_CONFERENCE] = {
'platform': 'zoom',
'config': zoom_config
}
return True
except Exception as e:
print(f"Zoom integration test failed: {e}")
return False
return False
async def create_video_conference_meeting(self, event_details: Dict[str, Any]) -> Dict[str, Any]:
"""Create a video conference meeting for an event."""
if IntegrationType.VIDEO_CONFERENCE not in self.integrations:
return {'success': False, 'error': 'Video conference integration not configured'}
integration = self.integrations[IntegrationType.VIDEO_CONFERENCE]
if integration['platform'] == 'zoom':
return await self._create_zoom_meeting(event_details, integration['config'])
return {'success': False, 'error': 'Unsupported video conference platform'}
async def _create_zoom_meeting(self, event_details: Dict[str, Any],
zoom_config: Dict[str, Any]) -> Dict[str, Any]:
"""Create a Zoom meeting."""
# This would use the actual Zoom API
# For demonstration, return mock data
meeting_data = {
'topic': event_details.get('title', 'Calendar Meeting'),
'type': 2, # Scheduled meeting
'start_time': event_details['start_time'].isoformat(),
'duration': event_details.get('duration', 60),
'timezone': event_details.get('timezone', 'UTC'),
'settings': {
'host_video': True,
'participant_video': True,
'join_before_host': False,
'mute_upon_entry': True,
'waiting_room': True
}
}
# Mock Zoom API response
return {
'success': True,
'meeting_url': f"https://zoom.us/j/123456789",
'meeting_id': '123456789',
'password': 'abc123',
'start_url': f"https://zoom.us/s/123456789?role=1"
}
async def send_meeting_invitation(self, event: Event, participants: List[str]) -> bool:
"""Send meeting invitations via email."""
if IntegrationType.EMAIL not in self.integrations:
return False
email_config = self.integrations[IntegrationType.EMAIL]
try:
# Create email content
subject = f"Meeting Invitation: {event.title}"
# Generate calendar invitation (ICS format)
ics_content = self._generate_ics_invitation(event)
body = f"""
You're invited to a meeting:
Title: {event.title}
Date: {event.start_time.strftime('%A, %B %d, %Y')}
Time: {event.start_time.strftime('%I:%M %p')} - {event.calculated_end_time.strftime('%I:%M %p')}
{f'Location: {event.location}' if event.location else ''}
{f'Description: {event.description}' if event.description else ''}
Please confirm your attendance.
"""
# Send email to each participant
for participant_email in participants:
await self._send_email(
email_config,
participant_email,
subject,
body,
ics_content
)
return True
except Exception as e:
print(f"Error sending meeting invitations: {e}")
return False
def _generate_ics_invitation(self, event: Event) -> str:
"""Generate ICS calendar invitation content."""
ics_content = f"""BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Calendar App//EN
BEGIN:VEVENT
UID:{event.id}@calendar-app.com
DTSTAMP:{datetime.now().strftime('%Y%m%dT%H%M%SZ')}
DTSTART:{event.start_time.strftime('%Y%m%dT%H%M%SZ')}
DTEND:{event.calculated_end_time.strftime('%Y%m%dT%H%M%SZ')}
SUMMARY:{event.title}
DESCRIPTION:{event.description or ''}
LOCATION:{event.location or ''}
STATUS:CONFIRMED
SEQUENCE:0
END:VEVENT
END:VCALENDAR"""
return ics_content
async def _send_email(self, email_config: Dict[str, Any], to_email: str,
subject: str, body: str, attachment_content: str = None):
"""Send email with optional calendar attachment."""
msg = MIMEMultipart()
msg['From'] = email_config['username']
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
if attachment_content:
attachment = MIMEText(attachment_content)
attachment.add_header('Content-Disposition', 'attachment', filename='meeting.ics')
msg.attach(attachment)
# Send email
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
server.starttls()
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
class PredictiveAssistant:
def __init__(self, calendar_engine, pattern_analyzer):
self.calendar_engine = calendar_engine
self.pattern_analyzer = pattern_analyzer
self.prediction_models = {}
async def generate_proactive_suggestions(self, user_id: str) -> List[Dict[str, Any]]:
"""Generate proactive suggestions based on user patterns and context."""
suggestions = []
# Analyze current schedule
today = datetime.now().date()
week_start = today - timedelta(days=today.weekday())
week_end = week_start + timedelta(days=6)
current_events = await self.calendar_engine.get_events_in_range(
user_id,
datetime.combine(week_start, datetime.min.time()),
datetime.combine(week_end, datetime.max.time())
)
# Check for scheduling opportunities
scheduling_suggestions = await self._suggest_scheduling_opportunities(user_id, current_events)
suggestions.extend(scheduling_suggestions)
# Check for optimization opportunities
optimization_suggestions = await self._suggest_schedule_optimizations(user_id, current_events)
suggestions.extend(optimization_suggestions)
# Check for preparation reminders
preparation_suggestions = await self._suggest_preparation_reminders(current_events)
suggestions.extend(preparation_suggestions)
return suggestions
async def _suggest_scheduling_opportunities(self, user_id: str,
current_events: List[Event]) -> List[Dict[str, Any]]:
"""Suggest scheduling opportunities based on patterns."""
suggestions = []
patterns = await self.pattern_analyzer.analyze_user_patterns(user_id)
# Check for missing regular meetings
if patterns.get('meeting_frequency', {}).get('average_daily', 0) > len(current_events) / 7:
suggestions.append({
'type': 'scheduling_opportunity',
'priority': 'medium',
'title': 'Light schedule detected',
'description': 'You have fewer meetings than usual this week. Consider scheduling important discussions.',
'action': 'suggest_meeting_slots'
})
# Check for focus time opportunities
busy_days = [day for day, events in self._group_events_by_day(current_events).items()
if len(events) > 3]
if len(busy_days) < 3: # Less than 3 busy days
suggestions.append({
'type': 'focus_time_opportunity',
'priority': 'high',
'title': 'Focus time available',
'description': 'You have several days with light schedules. Perfect for deep work!',
'action': 'block_focus_time'
})
return suggestions
async def _suggest_schedule_optimizations(self, user_id: str,
current_events: List[Event]) -> List[Dict[str, Any]]:
"""Suggest schedule optimizations."""
suggestions = []
# Check for fragmented schedule
fragmented_days = self._detect_fragmented_schedule(current_events)
if fragmented_days:
suggestions.append({
'type': 'schedule_optimization',
'priority': 'medium',
'title': 'Fragmented schedule detected',
'description': f'You have fragmented schedules on {len(fragmented_days)} days. Consider consolidating meetings.',
'action': 'consolidate_meetings',
'data': {'fragmented_days': fragmented_days}
})
# Check for back-to-back meetings
back_to_back_count = self._count_back_to_back_meetings(current_events)
if back_to_back_count > 3:
suggestions.append({
'type': 'schedule_optimization',
'priority': 'high',
'title': 'Too many back-to-back meetings',
'description': f'You have {back_to_back_count} back-to-back meetings. Consider adding buffers.',
'action': 'add_meeting_buffers'
})
return suggestions
def _group_events_by_day(self, events: List[Event]) -> Dict[date, List[Event]]:
"""Group events by day."""
events_by_day = {}
for event in events:
day = event.start_time.date()
if day not in events_by_day:
events_by_day[day] = []
events_by_day[day].append(event)
return events_by_day
def _detect_fragmented_schedule(self, events: List[Event]) -> List[date]:
"""Detect days with fragmented schedules."""
fragmented_days = []
events_by_day = self._group_events_by_day(events)
for day, day_events in events_by_day.items():
if len(day_events) < 2:
continue
# Sort events by start time
day_events.sort(key=lambda x: x.start_time)
# Check for large gaps between meetings
gaps = []
for i in range(len(day_events) - 1):
gap = (day_events[i+1].start_time - day_events[i].calculated_end_time).total_seconds() / 3600
gaps.append(gap)
# If there are gaps > 2 hours between meetings, consider it fragmented
if any(gap > 2 for gap in gaps):
fragmented_days.append(day)
return fragmented_days
def _count_back_to_back_meetings(self, events: List[Event]) -> int:
"""Count back-to-back meetings (< 15 minutes between)."""
count = 0
events_by_day = self._group_events_by_day(events)
for day_events in events_by_day.values():
day_events.sort(key=lambda x: x.start_time)
for i in range(len(day_events) - 1):
gap_minutes = (day_events[i+1].start_time - day_events[i].calculated_end_time).total_seconds() / 60
if gap_minutes < 15:
count += 1
return count
class SchedulingOptimizer:
def __init__(self):
self.optimization_strategies = {
SchedulingStrategy.OPTIMIZE_FOR_FOCUS: self._optimize_for_focus,
SchedulingStrategy.MINIMIZE_CONFLICTS: self._minimize_conflicts,
SchedulingStrategy.BALANCE_WORKLOAD: self._balance_workload,
SchedulingStrategy.RESPECT_PREFERENCES: self._respect_preferences,
SchedulingStrategy.MINIMIZE_TRAVEL: self._minimize_travel
}
async def optimize_suggestions(self, candidate_slots: List[datetime],
constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Optimize and rank scheduling suggestions."""
scored_suggestions = []
for slot in candidate_slots:
score = 0.0
factors = {}
reasoning_parts = []
# Apply each optimization strategy
for strategy in SchedulingStrategy:
strategy_score, strategy_factors, strategy_reasoning = await self.optimization_strategies[strategy](
slot, constraints, patterns
)
score += strategy_score
factors[strategy.value] = strategy_factors
if strategy_reasoning:
reasoning_parts.append(strategy_reasoning)
# Normalize score
score = min(score / len(SchedulingStrategy), 1.0)
scored_suggestions.append({
'time': slot,
'score': score,
'factors': factors,
'reasoning': '; '.join(reasoning_parts)
})
# Sort by score (highest first)
scored_suggestions.sort(key=lambda x: x['score'], reverse=True)
return scored_suggestions
async def _optimize_for_focus(self, slot: datetime, constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
"""Optimize for focus time and deep work."""
score = 0.0
factors = {}
reasoning = ""
# Prefer morning slots for focus
if 9 <= slot.hour <= 11:
score += 0.3
factors['morning_focus'] = 0.3
reasoning = "Morning slot optimal for focus"
# Avoid fragmented time slots
# This would check surrounding events to ensure sufficient focus time
return score, factors, reasoning
async def _minimize_conflicts(self, slot: datetime, constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
"""Minimize scheduling conflicts."""
score = 0.0
factors = {}
reasoning = ""
# Check constraint satisfaction
satisfied_constraints = 0
total_constraints = len(constraints)
for constraint in constraints:
# This would check if the slot satisfies the constraint
# For now, assume all constraints are satisfied
satisfied_constraints += 1
if total_constraints > 0:
constraint_score = satisfied_constraints / total_constraints
score += constraint_score * 0.4
factors['constraint_satisfaction'] = constraint_score
if constraint_score == 1.0:
reasoning = "No conflicts detected"
else:
reasoning = f"Satisfies {satisfied_constraints}/{total_constraints} constraints"
return score, factors, reasoning
async def _balance_workload(self, slot: datetime, constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
"""Balance workload across time periods."""
score = 0.0
factors = {}
reasoning = ""
# This would analyze existing workload and prefer slots that balance it
# For now, return a moderate score
score += 0.2
factors['workload_balance'] = 0.2
reasoning = "Contributes to balanced workload"
return score, factors, reasoning
async def _respect_preferences(self, slot: datetime, constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
"""Respect user preferences and patterns."""
score = 0.0
factors = {}
reasoning = ""
# Check against preferred times from patterns
preferred_times = patterns.get('preferred_times', {}).get('peak_hours', [])
if preferred_times:
slot_hour = slot.hour + slot.minute / 60.0
# Find closest preferred time
closest_preferred = min(preferred_times, key=lambda x: abs(x - slot_hour))
time_diff = abs(closest_preferred - slot_hour)
if time_diff <= 1.0: # Within 1 hour of preferred time
preference_score = 1.0 - (time_diff / 1.0)
score += preference_score * 0.3
factors['time_preference'] = preference_score
reasoning = f"Aligns with preferred meeting time ({closest_preferred:.1f})"
return score, factors, reasoning
async def _minimize_travel(self, slot: datetime, constraints: List[SchedulingConstraint],
patterns: Dict[str, Any]) -> Tuple[float, Dict[str, float], str]:
"""Minimize travel time between meetings."""
score = 0.0
factors = {}
reasoning = ""
# This would analyze location-based constraints and optimize for minimal travel
# For now, return a base score
score += 0.1
factors['travel_optimization'] = 0.1
reasoning = "Optimized for minimal travel"
return score, factors, reasoning
class ConstraintSolver:
def __init__(self):
pass
async def solve_scheduling_constraints(self, requirements: Dict[str, Any],
constraints: List[SchedulingConstraint]) -> List[datetime]:
"""Solve complex scheduling constraints to find feasible solutions."""
# This would implement a constraint satisfaction algorithm
# For now, return a simple implementation
feasible_slots = []
# Start with a basic time range
start_time = datetime.now() + timedelta(days=1)
end_time = start_time + timedelta(days=14)
current_time = start_time.replace(hour=9, minute=0, second=0, microsecond=0)
while current_time < end_time:
# Check if this slot satisfies all constraints
if await self._satisfies_all_constraints(current_time, constraints):
feasible_slots.append(current_time)
current_time += timedelta(minutes=30)
# Skip weekends and non-business hours
if current_time.weekday() >= 5: # Weekend
current_time += timedelta(days=2 - current_time.weekday() + 5)
current_time = current_time.replace(hour=9, minute=0)
elif current_time.hour >= 17: # After business hours
current_time = current_time.replace(hour=9, minute=0) + timedelta(days=1)
return feasible_slots[:20] # Return top 20 feasible slots
async def _satisfies_all_constraints(self, slot_time: datetime,
constraints: List[SchedulingConstraint]) -> bool:
"""Check if a time slot satisfies all constraints."""
for constraint in constraints:
if not await self._check_single_constraint(slot_time, constraint):
return False
return True
async def _check_single_constraint(self, slot_time: datetime,
constraint: SchedulingConstraint) -> bool:
"""Check a single constraint against a time slot."""
# Implementation would depend on constraint type
# For now, return True for most constraints
if constraint.constraint_type == 'work_hours':
work_start = constraint.parameters.get('start', time(9, 0))
work_end = constraint.parameters.get('end', time(17, 0))
return work_start <= slot_time.time() <= work_end
return True
This advanced features implementation demonstrates how an LLM-based calendar application can provide sophisticated scheduling intelligence, seamless external integrations, and proactive assistance. The IntelligentScheduler uses machine learning techniques to analyze user patterns and optimize scheduling decisions, while the ExternalIntegrationManager enables seamless connectivity with email, video conferencing, and other external services. The PredictiveAssistant provides proactive suggestions to help users optimize their schedules and maintain work-life balance. These advanced features transform the calendar from a passive scheduling tool into an intelligent assistant that actively helps users manage their time more effectively.
Testing and Deployment Considerations
Testing an LLM-based calendar application presents unique challenges that go beyond traditional software testing approaches. The natural language processing components introduce variability and ambiguity that require specialized testing strategies to ensure reliable behavior across diverse user inputs and scenarios. The system must be tested not only for functional correctness but also for natural language understanding accuracy, conversational flow quality, and user experience consistency.
The testing strategy must encompass multiple layers including unit tests for individual components, integration tests for system interactions, natural language processing accuracy tests, user experience validation, and performance testing under various load conditions. Each layer requires different testing methodologies and success criteria, with particular attention to edge cases in natural language interpretation and error recovery scenarios.
Deployment considerations for an LLM-based calendar application involve complex infrastructure requirements including natural language processing services, real-time conversation management, database scalability, and external service integrations. The system must be designed for high availability while managing the computational costs associated with LLM processing and maintaining responsive user interactions across different devices and network conditions.
Here is a comprehensive implementation of testing frameworks and deployment strategies specifically designed for LLM-based calendar applications. This example demonstrates how to test natural language processing accuracy, validate conversational flows, and implement robust deployment patterns that ensure system reliability and performance.
import unittest
import asyncio
import pytest
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime, date, timedelta
import json
from typing import List, Dict, Any, Tuple
import numpy as np
from dataclasses import dataclass
import logging
from concurrent.futures import ThreadPoolExecutor
import time
import requests
@dataclass
class TestCase:
name: str
input_text: str
expected_intent: Dict[str, Any]
expected_action: str
context: Dict[str, Any] = None
expected_confidence: float = 0.8
@dataclass
class ConversationTestCase:
name: str
conversation_turns: List[Tuple[str, str]] # (user_input, expected_response_pattern)
initial_context: Dict[str, Any] = None
success_criteria: Dict[str, Any] = None
class NLPTestSuite:
def __init__(self, llm_service, calendar_engine):
self.llm_service = llm_service
self.calendar_engine = calendar_engine
self.test_cases = self._load_test_cases()
self.conversation_test_cases = self._load_conversation_test_cases()
def _load_test_cases(self) -> List[TestCase]:
"""Load comprehensive test cases for NLP testing."""
return [
# Event Creation Tests
TestCase(
name="simple_meeting_creation",
input_text="Schedule a meeting with John tomorrow at 2 PM",
expected_intent={
"action": "create",
"parameters": {
"entities": {"people": ["John"]},
"temporal": {"date": "tomorrow", "time": "2 PM"}
}
},
expected_action="create"
),
TestCase(
name="recurring_meeting_creation",
input_text="Set up a weekly team meeting every Tuesday at 10 AM",
expected_intent={
"action": "create",
"parameters": {
"temporal": {"recurrence": "weekly", "day": "Tuesday", "time": "10 AM"}
}
},
expected_action="create"
),
TestCase(
name="complex_meeting_with_location",
input_text="Book a project review meeting with Sarah and Mike next Friday at 3 PM in the conference room",
expected_intent={
"action": "create",
"parameters": {
"entities": {
"people": ["Sarah", "Mike"],
"locations": ["conference room"],
"topics": ["project review"]
},
"temporal": {"date": "next Friday", "time": "3 PM"}
}
},
expected_action="create"
),
# Event Query Tests
TestCase(
name="simple_schedule_query",
input_text="What do I have scheduled for today?",
expected_intent={
"action": "query",
"parameters": {
"temporal": {"date": "today"}
}
},
expected_action="query"
),
TestCase(
name="time_range_query",
input_text="Show me my meetings for next week",
expected_intent={
"action": "query",
"parameters": {
"temporal": {"date_range": "next week"}
}
},
expected_action="query"
),
# Event Modification Tests
TestCase(
name="simple_reschedule",
input_text="Move my 3 PM meeting to 4 PM",
expected_intent={
"action": "update",
"parameters": {
"temporal": {"original_time": "3 PM", "new_time": "4 PM"}
}
},
expected_action="update"
),
TestCase(
name="contextual_reschedule",
input_text="Move it to tomorrow",
expected_intent={
"action": "update",
"parameters": {
"temporal": {"new_date": "tomorrow"}
}
},
expected_action="update",
context={"last_referenced_event": "meeting_id_123"}
),
# Free Time Finding Tests
TestCase(
name="find_free_time",
input_text="When am I free this afternoon?",
expected_intent={
"action": "find_slots",
"parameters": {
"temporal": {"date": "today", "time_period": "afternoon"}
}
},
expected_action="find_slots"
),
# Edge Cases
TestCase(
name="ambiguous_time_reference",
input_text="Schedule a meeting next Tuesday",
expected_intent={
"action": "create",
"parameters": {
"temporal": {"date": "next Tuesday"}
}
},
expected_action="create",
expected_confidence=0.6 # Lower confidence due to missing time
),
TestCase(
name="multiple_actions",
input_text="Cancel my 2 PM meeting and schedule a new one with Tom at 3 PM",
expected_intent={
"action": "delete", # Should identify primary action
"parameters": {
"temporal": {"time": "2 PM"},
"secondary_action": {
"action": "create",
"entities": {"people": ["Tom"]},
"temporal": {"time": "3 PM"}
}
}
},
expected_action="delete"
)
]
def _load_conversation_test_cases(self) -> List[ConversationTestCase]:
"""Load conversation flow test cases."""
return [
ConversationTestCase(
name="meeting_creation_flow",
conversation_turns=[
("Schedule a meeting with John", ".*when.*time.*"),
("Tomorrow at 2 PM", ".*scheduled.*John.*tomorrow.*2.*PM.*"),
("Add a reminder 30 minutes before", ".*reminder.*added.*")
],
success_criteria={"events_created": 1, "reminders_created": 1}
),
ConversationTestCase(
name="conflict_resolution_flow",
conversation_turns=[
("Schedule a meeting tomorrow at 2 PM", ".*conflict.*existing.*"),
("Show me alternative times", ".*available.*slots.*"),
("Book the 3 PM slot", ".*scheduled.*3.*PM.*")
],
success_criteria={"events_created": 1, "conflicts_resolved": 1}
),
ConversationTestCase(
name="clarification_flow",
conversation_turns=[
("Schedule a meeting next week", ".*which.*day.*time.*"),
("Tuesday at 10 AM", ".*duration.*"),
("One hour", ".*scheduled.*Tuesday.*10.*AM.*hour.*")
],
success_criteria={"events_created": 1, "clarifications_handled": 2}
)
]
async def run_nlp_accuracy_tests(self) -> Dict[str, Any]:
"""Run comprehensive NLP accuracy tests."""
results = {
"total_tests": len(self.test_cases),
"passed": 0,
"failed": 0,
"accuracy_scores": [],
"failed_cases": [],
"performance_metrics": {}
}
start_time = time.time()
for test_case in self.test_cases:
try:
# Set up context if provided
if test_case.context:
self._setup_test_context(test_case.context)
# Process the input
parsed_intent = await self.llm_service.parse_intent(
test_case.input_text,
test_case.context or {}
)
# Validate the result
is_correct, accuracy_score = self._validate_intent(
parsed_intent,
test_case.expected_intent,
test_case.expected_confidence
)
if is_correct:
results["passed"] += 1
else:
results["failed"] += 1
results["failed_cases"].append({
"test_name": test_case.name,
"input": test_case.input_text,
"expected": test_case.expected_intent,
"actual": parsed_intent,
"accuracy_score": accuracy_score
})
results["accuracy_scores"].append(accuracy_score)
except Exception as e:
results["failed"] += 1
results["failed_cases"].append({
"test_name": test_case.name,
"error": str(e)
})
# Calculate performance metrics
end_time = time.time()
results["performance_metrics"] = {
"total_time": end_time - start_time,
"average_time_per_test": (end_time - start_time) / len(self.test_cases),
"overall_accuracy": np.mean(results["accuracy_scores"]) if results["accuracy_scores"] else 0,
"pass_rate": results["passed"] / results["total_tests"]
}
return results
async def run_conversation_flow_tests(self) -> Dict[str, Any]:
"""Run conversation flow tests."""
results = {
"total_conversations": len(self.conversation_test_cases),
"passed": 0,
"failed": 0,
"failed_conversations": []
}
for conv_test in self.conversation_test_cases:
try:
conversation_passed = await self._test_conversation_flow(conv_test)
if conversation_passed:
results["passed"] += 1
else:
results["failed"] += 1
results["failed_conversations"].append(conv_test.name)
except Exception as e:
results["failed"] += 1
results["failed_conversations"].append({
"name": conv_test.name,
"error": str(e)
})
return results
def _validate_intent(self, actual_intent: Dict[str, Any],
expected_intent: Dict[str, Any],
min_confidence: float) -> Tuple[bool, float]:
"""Validate parsed intent against expected intent."""
accuracy_score = 0.0
# Check action
if actual_intent.get("action") == expected_intent.get("action"):
accuracy_score += 0.4
# Check parameters
actual_params = actual_intent.get("parameters", {})
expected_params = expected_intent.get("parameters", {})
# Check entities
if "entities" in expected_params:
entity_score = self._compare_entities(
actual_params.get("entities", {}),
expected_params["entities"]
)
accuracy_score += entity_score * 0.3
# Check temporal information
if "temporal" in expected_params:
temporal_score = self._compare_temporal(
actual_params.get("temporal", {}),
expected_params["temporal"]
)
accuracy_score += temporal_score * 0.3
# Check confidence
confidence = actual_intent.get("confidence", 0.0)
if confidence >= min_confidence:
accuracy_score += 0.1
is_correct = accuracy_score >= 0.8
return is_correct, accuracy_score
def _compare_entities(self, actual_entities: Dict[str, List[str]],
expected_entities: Dict[str, List[str]]) -> float:
"""Compare entity extraction results."""
if not expected_entities:
return 1.0
total_score = 0.0
entity_types = set(expected_entities.keys()) | set(actual_entities.keys())
for entity_type in entity_types:
expected_list = set(expected_entities.get(entity_type, []))
actual_list = set(actual_entities.get(entity_type, []))
if expected_list:
intersection = expected_list & actual_list
union = expected_list | actual_list
jaccard_score = len(intersection) / len(union) if union else 0
total_score += jaccard_score
return total_score / len(entity_types) if entity_types else 1.0
def _compare_temporal(self, actual_temporal: Dict[str, Any],
expected_temporal: Dict[str, Any]) -> float:
"""Compare temporal information extraction."""
if not expected_temporal:
return 1.0
score = 0.0
total_fields = len(expected_temporal)
for field, expected_value in expected_temporal.items():
if field in actual_temporal:
# Simplified comparison - in practice, would need more sophisticated temporal matching
if str(actual_temporal[field]).lower() in str(expected_value).lower():
score += 1.0
return score / total_fields if total_fields > 0 else 1.0
async def _test_conversation_flow(self, conv_test: ConversationTestCase) -> bool:
"""Test a complete conversation flow."""
conversation_manager = ConversationManager(self.llm_service, self.calendar_engine)
ui_state = UserInterfaceState(
current_view=ViewMode.DAY,
selected_date=date.today(),
interaction_mode=InteractionMode.CONVERSATION,
conversation_active=True
)
# Set up initial context
if conv_test.initial_context:
self._setup_test_context(conv_test.initial_context)
# Process each conversation turn
for user_input, expected_response_pattern in conv_test.conversation_turns:
try:
response = await conversation_manager.process_user_input(user_input, ui_state)
# Check if response matches expected pattern
import re
if not re.search(expected_response_pattern, response.get("message", ""), re.IGNORECASE):
return False
except Exception as e:
logging.error(f"Conversation test failed: {e}")
return False
# Check success criteria
if conv_test.success_criteria:
return self._check_success_criteria(conv_test.success_criteria)
return True
def _setup_test_context(self, context: Dict[str, Any]):
"""Set up test context for conversation tests."""
# Implementation would set up mock data and context
pass
def _check_success_criteria(self, criteria: Dict[str, Any]) -> bool:
"""Check if success criteria are met."""
# Implementation would verify that expected actions were performed
return True
class PerformanceTestSuite:
def __init__(self, calendar_app):
self.calendar_app = calendar_app
self.load_test_scenarios = self._define_load_test_scenarios()
def _define_load_test_scenarios(self) -> List[Dict[str, Any]]:
"""Define load testing scenarios."""
return [
{
"name": "concurrent_nlp_processing",
"description": "Test concurrent natural language processing",
"concurrent_users": 50,
"requests_per_user": 10,
"test_inputs": [
"Schedule a meeting tomorrow at 2 PM",
"What do I have today?",
"Find me a free hour this week",
"Cancel my 3 PM appointment"
]
},
{
"name": "database_stress_test",
"description": "Test database performance under load",
"concurrent_users": 100,
"requests_per_user": 20,
"operations": ["create", "read", "update", "delete"]
},
{
"name": "conversation_memory_test",
"description": "Test conversation memory management",
"concurrent_conversations": 25,
"turns_per_conversation": 15,
"memory_threshold_mb": 1000
}
]
async def run_performance_tests(self) -> Dict[str, Any]:
"""Run comprehensive performance tests."""
results = {
"scenarios": {},
"overall_metrics": {},
"recommendations": []
}
for scenario in self.load_test_scenarios:
scenario_results = await self._run_load_scenario(scenario)
results["scenarios"][scenario["name"]] = scenario_results
# Calculate overall metrics
results["overall_metrics"] = self._calculate_overall_metrics(results["scenarios"])
# Generate recommendations
results["recommendations"] = self._generate_performance_recommendations(results)
return results
async def _run_load_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Run a specific load testing scenario."""
start_time = time.time()
if scenario["name"] == "concurrent_nlp_processing":
return await self._test_concurrent_nlp(scenario)
elif scenario["name"] == "database_stress_test":
return await self._test_database_stress(scenario)
elif scenario["name"] == "conversation_memory_test":
return await self._test_conversation_memory(scenario)
return {"error": "Unknown scenario"}
async def _test_concurrent_nlp(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Test concurrent NLP processing performance."""
concurrent_users = scenario["concurrent_users"]
requests_per_user = scenario["requests_per_user"]
test_inputs = scenario["test_inputs"]
async def user_simulation():
user_results = {
"requests_completed": 0,
"total_time": 0,
"errors": 0,
"response_times": []
}
for _ in range(requests_per_user):
input_text = np.random.choice(test_inputs)
request_start = time.time()
try:
# Simulate NLP processing
await self.calendar_app.process_user_command(input_text)
request_time = time.time() - request_start
user_results["requests_completed"] += 1
user_results["response_times"].append(request_time)
user_results["total_time"] += request_time
except Exception as e:
user_results["errors"] += 1
return user_results
# Run concurrent user simulations
tasks = [user_simulation() for _ in range(concurrent_users)]
user_results = await asyncio.gather(*tasks)
# Aggregate results
total_requests = sum(r["requests_completed"] for r in user_results)
total_errors = sum(r["errors"] for r in user_results)
all_response_times = []
for r in user_results:
all_response_times.extend(r["response_times"])
return {
"total_requests": total_requests,
"total_errors": total_errors,
"error_rate": total_errors / (total_requests + total_errors) if (total_requests + total_errors) > 0 else 0,
"average_response_time": np.mean(all_response_times) if all_response_times else 0,
"p95_response_time": np.percentile(all_response_times, 95) if all_response_times else 0,
"p99_response_time": np.percentile(all_response_times, 99) if all_response_times else 0,
"throughput_rps": total_requests / max(max(r["total_time"] for r in user_results), 1)
}
def _calculate_overall_metrics(self, scenario_results: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate overall performance metrics."""
overall_metrics = {
"average_response_time": 0,
"total_error_rate": 0,
"peak_throughput": 0,
"memory_efficiency": "good"
}
# Aggregate metrics from all scenarios
response_times = []
error_rates = []
throughputs = []
for scenario_name, results in scenario_results.items():
if "average_response_time" in results:
response_times.append(results["average_response_time"])
if "error_rate" in results:
error_rates.append(results["error_rate"])
if "throughput_rps" in results:
throughputs.append(results["throughput_rps"])
if response_times:
overall_metrics["average_response_time"] = np.mean(response_times)
if error_rates:
overall_metrics["total_error_rate"] = np.mean(error_rates)
if throughputs:
overall_metrics["peak_throughput"] = max(throughputs)
return overall_metrics
def _generate_performance_recommendations(self, results: Dict[str, Any]) -> List[str]:
"""Generate performance optimization recommendations."""
recommendations = []
overall_metrics = results["overall_metrics"]
# Response time recommendations
if overall_metrics.get("average_response_time", 0) > 2.0:
recommendations.append(
"Consider optimizing NLP processing pipeline - average response time exceeds 2 seconds"
)
# Error rate recommendations
if overall_metrics.get("total_error_rate", 0) > 0.05:
recommendations.append(
"Error rate is above 5% - implement better error handling and retry mechanisms"
)
# Throughput recommendations
if overall_metrics.get("peak_throughput", 0) < 10:
recommendations.append(
"Low throughput detected - consider implementing request queuing and load balancing"
)
return recommendations
class DeploymentManager:
def __init__(self):
self.deployment_configs = self._load_deployment_configs()
self.health_checks = HealthCheckManager()
self.monitoring = MonitoringManager()
def _load_deployment_configs(self) -> Dict[str, Any]:
"""Load deployment configurations for different environments."""
return {
"development": {
"infrastructure": {
"app_servers": 1,
"database_replicas": 1,
"llm_service_instances": 1,
"cache_instances": 1
},
"scaling": {
"auto_scaling_enabled": False,
"min_instances": 1,
"max_instances": 2
},
"monitoring": {
"log_level": "DEBUG",
"metrics_enabled": True,
"alerting_enabled": False
}
},
"staging": {
"infrastructure": {
"app_servers": 2,
"database_replicas": 2,
"llm_service_instances": 2,
"cache_instances": 2,
"load_balancer": True
},
"scaling": {
"auto_scaling_enabled": True,
"min_instances": 2,
"max_instances": 5,
"cpu_threshold": 70,
"memory_threshold": 80
},
"monitoring": {
"log_level": "INFO",
"metrics_enabled": True,
"alerting_enabled": True,
"health_check_interval": 30
}
},
"production": {
"infrastructure": {
"app_servers": 5,
"database_replicas": 3,
"llm_service_instances": 10,
"cache_instances": 3,
"load_balancer": True,
"cdn_enabled": True,
"backup_instances": 2
},
"scaling": {
"auto_scaling_enabled": True,
"min_instances": 5,
"max_instances": 20,
"cpu_threshold": 60,
"memory_threshold": 70,
"predictive_scaling": True
},
"monitoring": {
"log_level": "WARN",
"metrics_enabled": True,
"alerting_enabled": True,
"health_check_interval": 15,
"performance_monitoring": True,
"user_analytics": True
},
"security": {
"encryption_at_rest": True,
"encryption_in_transit": True,
"api_rate_limiting": True,
"ddos_protection": True,
"security_scanning": True
}
}
}
async def deploy_application(self, environment: str, version: str) -> Dict[str, Any]:
"""Deploy the application to specified environment."""
if environment not in self.deployment_configs:
return {"success": False, "error": f"Unknown environment: {environment}"}
config = self.deployment_configs[environment]
deployment_result = {
"environment": environment,
"version": version,
"deployment_time": datetime.now().isoformat(),
"steps": [],
"success": True,
"rollback_available": True
}
try:
# Pre-deployment checks
pre_check_result = await self._run_pre_deployment_checks(environment, config)
deployment_result["steps"].append(pre_check_result)
if not pre_check_result["success"]:
deployment_result["success"] = False
return deployment_result
# Deploy infrastructure
infra_result = await self._deploy_infrastructure(config["infrastructure"])
deployment_result["steps"].append(infra_result)
# Deploy application
app_result = await self._deploy_application_code(version, config)
deployment_result["steps"].append(app_result)
# Configure monitoring
monitoring_result = await self._setup_monitoring(config.get("monitoring", {}))
deployment_result["steps"].append(monitoring_result)
# Run post-deployment tests
post_test_result = await self._run_post_deployment_tests(environment)
deployment_result["steps"].append(post_test_result)
# Configure auto-scaling
if config.get("scaling", {}).get("auto_scaling_enabled"):
scaling_result = await self._configure_auto_scaling(config["scaling"])
deployment_result["steps"].append(scaling_result)
except Exception as e:
deployment_result["success"] = False
deployment_result["error"] = str(e)
# Attempt rollback
rollback_result = await self._rollback_deployment(environment)
deployment_result["rollback_result"] = rollback_result
return deployment_result
async def _run_pre_deployment_checks(self, environment: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Run pre-deployment checks."""
checks = {
"database_connectivity": await self._check_database_connectivity(),
"external_services": await self._check_external_services(),
"resource_availability": await self._check_resource_availability(config),
"security_compliance": await self._check_security_compliance(environment)
}
all_passed = all(checks.values())
return {
"step": "pre_deployment_checks",
"success": all_passed,
"checks": checks,
"message": "All pre-deployment checks passed" if all_passed else "Some checks failed"
}
async def _deploy_infrastructure(self, infra_config: Dict[str, Any]) -> Dict[str, Any]:
"""Deploy infrastructure components."""
# This would typically use infrastructure as code tools like Terraform
# For demonstration, we'll simulate the deployment
components = []
# Deploy app servers
for i in range(infra_config.get("app_servers", 1)):
components.append(f"app_server_{i+1}")
# Deploy database replicas
for i in range(infra_config.get("database_replicas", 1)):
components.append(f"database_replica_{i+1}")
# Deploy LLM service instances
for i in range(infra_config.get("llm_service_instances", 1)):
components.append(f"llm_service_{i+1}")
return {
"step": "infrastructure_deployment",
"success": True,
"components_deployed": components,
"message": f"Deployed {len(components)} infrastructure components"
}
async def _deploy_application_code(self, version: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Deploy application code."""
# This would typically involve:
# - Building Docker images
# - Pushing to container registry
# - Rolling deployment to instances
# - Database migrations
return {
"step": "application_deployment",
"success": True,
"version": version,
"deployment_strategy": "rolling_update",
"message": f"Successfully deployed version {version}"
}
async def _setup_monitoring(self, monitoring_config: Dict[str, Any]) -> Dict[str, Any]:
"""Setup monitoring and alerting."""
monitoring_components = []
if monitoring_config.get("metrics_enabled"):
monitoring_components.append("metrics_collection")
if monitoring_config.get("alerting_enabled"):
monitoring_components.append("alerting_system")
if monitoring_config.get("performance_monitoring"):
monitoring_components.append("performance_monitoring")
return {
"step": "monitoring_setup",
"success": True,
"components": monitoring_components,
"log_level": monitoring_config.get("log_level", "INFO"),
"message": "Monitoring configured successfully"
}
async def _run_post_deployment_tests(self, environment: str) -> Dict[str, Any]:
"""Run post-deployment validation tests."""
test_results = {
"health_check": await self.health_checks.run_health_check(),
"api_endpoints": await self._test_api_endpoints(),
"nlp_functionality": await self._test_nlp_functionality(),
"database_operations": await self._test_database_operations()
}
all_passed = all(test_results.values())
return {
"step": "post_deployment_tests",
"success": all_passed,
"test_results": test_results,
"message": "All tests passed" if all_passed else "Some tests failed"
}
async def _configure_auto_scaling(self, scaling_config: Dict[str, Any]) -> Dict[str, Any]:
"""Configure auto-scaling policies."""
policies = {
"cpu_scaling": {
"threshold": scaling_config.get("cpu_threshold", 70),
"scale_up_cooldown": 300,
"scale_down_cooldown": 600
},
"memory_scaling": {
"threshold": scaling_config.get("memory_threshold", 80),
"scale_up_cooldown": 300,
"scale_down_cooldown": 600
}
}
if scaling_config.get("predictive_scaling"):
policies["predictive_scaling"] = {
"enabled": True,
"forecast_period": 3600,
"confidence_threshold": 0.8
}
return {
"step": "auto_scaling_configuration",
"success": True,
"policies": policies,
"min_instances": scaling_config.get("min_instances", 1),
"max_instances": scaling_config.get("max_instances", 10),
"message": "Auto-scaling configured successfully"
}
async def _check_database_connectivity(self) -> bool:
"""Check database connectivity."""
# Implementation would test actual database connection
return True
async def _check_external_services(self) -> bool:
"""Check external service availability."""
# Implementation would test external API endpoints
return True
async def _check_resource_availability(self, config: Dict[str, Any]) -> bool:
"""Check if required resources are available."""
# Implementation would check CPU, memory, storage availability
return True
async def _check_security_compliance(self, environment: str) -> bool:
"""Check security compliance requirements."""
# Implementation would run security scans and compliance checks
return True
async def _test_api_endpoints(self) -> bool:
"""Test API endpoint availability."""
# Implementation would test critical API endpoints
return True
async def _test_nlp_functionality(self) -> bool:
"""Test NLP functionality."""
# Implementation would run basic NLP tests
return True
async def _test_database_operations(self) -> bool:
"""Test database operations."""
# Implementation would test CRUD operations
return True
async def _rollback_deployment(self, environment: str) -> Dict[str, Any]:
"""Rollback deployment in case of failure."""
return {
"rollback_initiated": True,
"rollback_time": datetime.now().isoformat(),
"success": True,
"message": "Rollback completed successfully"
}
class HealthCheckManager:
def __init__(self):
self.health_endpoints = [
"/health/database",
"/health/llm-service",
"/health/cache",
"/health/external-apis"
]
async def run_health_check(self) -> bool:
"""Run comprehensive health check."""
health_results = {}
for endpoint in self.health_endpoints:
try:
# This would make actual HTTP requests to health endpoints
# For demonstration, we'll simulate the checks
health_results[endpoint] = True
except Exception as e:
health_results[endpoint] = False
return all(health_results.values())
class MonitoringManager:
def __init__(self):
self.metrics_collectors = []
self.alert_rules = []
def setup_monitoring(self, config: Dict[str, Any]):
"""Setup monitoring based on configuration."""
if config.get("metrics_enabled"):
self._setup_metrics_collection()
if config.get("alerting_enabled"):
self._setup_alerting()
if config.get("performance_monitoring"):
self._setup_performance_monitoring()
def _setup_metrics_collection(self):
"""Setup metrics collection."""
# Implementation would configure metrics collection
pass
def _setup_alerting(self):
"""Setup alerting rules."""
# Implementation would configure alerting system
pass
def _setup_performance_monitoring(self):
"""Setup performance monitoring."""
# Implementation would configure performance monitoring
pass
This comprehensive testing and deployment framework provides the foundation for reliably testing and deploying an LLM-based calendar application. The NLPTestSuite ensures that natural language processing components work correctly across diverse inputs and conversation flows, while the PerformanceTestSuite validates system performance under various load conditions. The DeploymentManager provides robust deployment capabilities with proper health checks, monitoring, and rollback mechanisms. This framework enables teams to confidently deploy and maintain complex LLM-based applications while ensuring high availability and performance standards.
Conclusion and Future Enhancements
The development of an LLM-based calendar application represents a significant advancement in how users interact with scheduling and time management tools. By integrating natural language processing capabilities with traditional calendar functionality, we have created a system that understands user intent expressed in conversational language and translates it into precise calendar operations. This approach removes the friction typically associated with calendar management while providing intelligent assistance that learns from user patterns and preferences.
The comprehensive system we have designed demonstrates the practical implementation of several advanced concepts including natural language understanding, intelligent scheduling optimization, external service integration, and responsive user interface design. Each component works together to create a cohesive experience that feels natural and intuitive while maintaining the reliability and precision required for effective calendar management. The modular architecture ensures that individual components can be enhanced or replaced as technology evolves.
The testing and deployment frameworks we have established provide the foundation for maintaining system reliability and performance as the application scales. The specialized testing approaches for natural language processing components, combined with traditional software testing methodologies, ensure that the system behaves predictably across diverse user inputs and usage patterns. The deployment infrastructure supports the complex requirements of LLM-based applications while providing the monitoring and scaling capabilities necessary for production environments.
Looking toward future enhancements, several areas present exciting opportunities for further development. Advanced machine learning techniques could enable more sophisticated pattern recognition and predictive scheduling capabilities. Integration with emerging technologies such as augmented reality interfaces, voice assistants, and IoT devices could expand the ways users interact with their calendars. Enhanced collaboration features could leverage natural language processing to facilitate group scheduling and meeting coordination across organizations.
The foundation we have built provides a robust platform for these future enhancements while demonstrating the practical viability of LLM-based calendar applications. As natural language processing technology continues to advance and user expectations for intelligent interfaces grow, applications like this will become increasingly important for helping people manage their time more effectively and maintain better work-life balance. The combination of conversational interfaces with intelligent automation represents a significant step forward in personal productivity tools, making sophisticated calendar management accessible to users regardless of their technical expertise.