Introduction and System Overview
The modern digital wellness landscape presents users with an overwhelming abundance of meditation and hypnosis content across various platforms. While platforms like YouTube and YouTube Music host thousands of therapeutic audio sessions, users often struggle to find content that precisely matches their current emotional state and specific wellness goals. This challenge has created an opportunity for intelligent content discovery systems that can bridge the gap between user needs and available therapeutic resources.
This article presents a comprehensive LLM-based chatbot system designed to intelligently discover, evaluate, and recommend meditation and hypnosis sessions from multiple platforms. The system leverages advanced natural language processing to understand user emotional states and therapeutic goals, then employs sophisticated search and ranking algorithms to identify the most suitable content from vast digital libraries.
The chatbot operates through a conversational interface where users describe their current mood and desired outcomes using natural language. For instance, a user might express "I'm feeling anxious about work and need help sleeping better tonight" or "I want to build confidence for an upcoming presentation." The system then processes this input to extract key emotional indicators and therapeutic objectives, subsequently searching across multiple platforms to identify relevant content.
Core System Architecture and Components
The system architecture follows clean architecture principles with distinct separation of concerns across multiple layers. The presentation layer handles user interactions through a conversational interface, while the business logic layer processes natural language inputs and manages recommendation algorithms. The data access layer interfaces with external APIs and manages local content caching.
The central component is the Intent Analysis Engine, which utilizes large language models to parse user inputs and extract structured information about emotional states and therapeutic goals. This engine employs prompt engineering techniques to ensure consistent and accurate interpretation of user needs, even when expressed in colloquial or ambiguous language.
import openai
import json
import re
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from enum import Enum
import asyncio
import aiohttp
from datetime import datetime, timedelta
class MoodCategory(Enum):
ANXIETY = "anxiety"
DEPRESSION = "depression"
STRESS = "stress"
HAPPINESS = "happiness"
ANGER = "anger"
SADNESS = "sadness"
NEUTRAL = "neutral"
EXCITEMENT = "excitement"
class TherapeuticGoal(Enum):
SLEEP_IMPROVEMENT = "sleep_improvement"
STRESS_REDUCTION = "stress_reduction"
CONFIDENCE_BUILDING = "confidence_building"
SMOKING_CESSATION = "smoking_cessation"
WEIGHT_MANAGEMENT = "weight_management"
PAIN_MANAGEMENT = "pain_management"
RELATIONSHIP_IMPROVEMENT = "relationship_improvement"
FOCUS_ENHANCEMENT = "focus_enhancement"
EMOTIONAL_HEALING = "emotional_healing"
SPIRITUAL_GROWTH = "spiritual_growth"
@dataclass
class UserIntent:
primary_mood: MoodCategory
secondary_moods: List[MoodCategory]
therapeutic_goals: List[TherapeuticGoal]
intensity_level: int # 1-10 scale
session_duration_preference: Optional[int] # minutes
experience_level: str # beginner, intermediate, advanced
specific_techniques: List[str]
raw_input: str
class IntentAnalysisEngine:
def __init__(self, openai_api_key: str):
self.client = openai.OpenAI(api_key=openai_api_key)
self.mood_keywords = self._initialize_mood_keywords()
self.goal_keywords = self._initialize_goal_keywords()
def _initialize_mood_keywords(self) -> Dict[MoodCategory, List[str]]:
return {
MoodCategory.ANXIETY: [
"anxious", "worried", "nervous", "stressed", "overwhelmed",
"panic", "fearful", "restless", "tense", "uneasy"
],
MoodCategory.DEPRESSION: [
"depressed", "sad", "down", "hopeless", "empty", "numb",
"worthless", "tired", "unmotivated", "lonely"
],
MoodCategory.STRESS: [
"stressed", "pressure", "overwhelmed", "burned out",
"exhausted", "frazzled", "overloaded", "strained"
],
MoodCategory.HAPPINESS: [
"happy", "joyful", "content", "pleased", "cheerful",
"optimistic", "grateful", "satisfied", "peaceful"
],
MoodCategory.ANGER: [
"angry", "frustrated", "irritated", "mad", "furious",
"annoyed", "resentful", "hostile", "aggressive"
]
}
def _initialize_goal_keywords(self) -> Dict[TherapeuticGoal, List[str]]:
return {
TherapeuticGoal.SLEEP_IMPROVEMENT: [
"sleep", "insomnia", "rest", "bedtime", "falling asleep",
"staying asleep", "sleep quality", "nighttime", "tired"
],
TherapeuticGoal.STRESS_REDUCTION: [
"stress relief", "calm down", "relax", "tension release",
"pressure relief", "unwind", "decompress"
],
TherapeuticGoal.CONFIDENCE_BUILDING: [
"confidence", "self-esteem", "self-worth", "courage",
"assertiveness", "presentation", "public speaking"
],
TherapeuticGoal.SMOKING_CESSATION: [
"quit smoking", "stop smoking", "cigarettes", "nicotine",
"smoking habit", "tobacco", "smoking addiction"
]
}
async def analyze_user_intent(self, user_input: str) -> UserIntent:
"""
Analyzes user input to extract mood, goals, and preferences using LLM.
"""
system_prompt = """
You are an expert therapeutic content analyst. Analyze the user's input to extract:
1. Primary emotional state/mood
2. Secondary emotional states (if any)
3. Therapeutic goals and objectives
4. Intensity level of emotions (1-10 scale)
5. Preferred session duration (if mentioned)
6. Experience level with meditation/hypnosis
7. Specific techniques mentioned
Return a structured JSON response with extracted information.
"""
user_prompt = f"""
Analyze this user input for therapeutic content recommendation:
"{user_input}"
Provide analysis in this JSON format:
{{
"primary_mood": "mood_category",
"secondary_moods": ["mood1", "mood2"],
"therapeutic_goals": ["goal1", "goal2"],
"intensity_level": 5,
"session_duration_preference": 20,
"experience_level": "beginner",
"specific_techniques": ["technique1", "technique2"],
"confidence_score": 0.85
}}
"""
try:
response = await self._call_llm(system_prompt, user_prompt)
parsed_response = json.loads(response)
return UserIntent(
primary_mood=MoodCategory(parsed_response.get("primary_mood", "neutral")),
secondary_moods=[MoodCategory(mood) for mood in parsed_response.get("secondary_moods", [])],
therapeutic_goals=[TherapeuticGoal(goal) for goal in parsed_response.get("therapeutic_goals", [])],
intensity_level=parsed_response.get("intensity_level", 5),
session_duration_preference=parsed_response.get("session_duration_preference"),
experience_level=parsed_response.get("experience_level", "beginner"),
specific_techniques=parsed_response.get("specific_techniques", []),
raw_input=user_input
)
except Exception as e:
# Fallback to keyword-based analysis
return await self._fallback_keyword_analysis(user_input)
async def _call_llm(self, system_prompt: str, user_prompt: str) -> str:
"""
Makes API call to OpenAI GPT model for intent analysis.
"""
response = await self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
temperature=0.3,
max_tokens=500
)
return response.choices[0].message.content
async def _fallback_keyword_analysis(self, user_input: str) -> UserIntent:
"""
Fallback method using keyword matching when LLM analysis fails.
"""
user_input_lower = user_input.lower()
detected_moods = []
detected_goals = []
# Analyze moods using keyword matching
for mood, keywords in self.mood_keywords.items():
if any(keyword in user_input_lower for keyword in keywords):
detected_moods.append(mood)
# Analyze goals using keyword matching
for goal, keywords in self.goal_keywords.items():
if any(keyword in user_input_lower for keyword in keywords):
detected_goals.append(goal)
# Determine intensity based on intensity words
intensity_words = {
"extremely": 9, "very": 8, "really": 7, "quite": 6,
"somewhat": 4, "slightly": 3, "a little": 2, "barely": 1
}
intensity = 5 # default
for word, level in intensity_words.items():
if word in user_input_lower:
intensity = level
break
return UserIntent(
primary_mood=detected_moods[0] if detected_moods else MoodCategory.NEUTRAL,
secondary_moods=detected_moods[1:],
therapeutic_goals=detected_goals,
intensity_level=intensity,
session_duration_preference=None,
experience_level="beginner",
specific_techniques=[],
raw_input=user_input
)
The Intent Analysis Engine represents the cognitive core of the system, responsible for transforming natural language expressions into structured data that can drive content discovery algorithms. This component employs a hybrid approach, primarily relying on large language models for sophisticated natural language understanding while maintaining keyword-based fallback mechanisms for reliability and cost optimization.
The engine processes user inputs through carefully crafted prompts that guide the LLM to extract specific categories of information. The system recognizes various mood categories ranging from common emotional states like anxiety and happiness to more complex psychological conditions. Similarly, it identifies therapeutic goals that span from immediate needs like sleep improvement to long-term objectives such as smoking cessation or confidence building.
Content Discovery and Platform Integration
The content discovery system interfaces with multiple platforms through their respective APIs, with YouTube Data API v3 serving as the primary integration point. This component handles the complex task of translating user intents into platform-specific search queries, managing API rate limits, and aggregating results across different content sources.
@dataclass
class ContentItem:
title: str
url: str
platform: str
duration_seconds: int
view_count: int
rating: float
upload_date: datetime
channel_name: str
description: str
thumbnail_url: str
tags: List[str]
category: str
language: str
@dataclass
class SearchQuery:
keywords: List[str]
duration_range: Tuple[int, int] # min, max seconds
upload_date_range: Tuple[datetime, datetime]
minimum_rating: float
content_type: str # "meditation", "hypnosis", "guided_imagery"
experience_level: str
class YouTubeContentDiscovery:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://www.googleapis.com/youtube/v3"
self.session = None
self.rate_limiter = RateLimiter(calls_per_minute=100)
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def search_content(self, user_intent: UserIntent, max_results: int = 50) -> List[ContentItem]:
"""
Searches YouTube for meditation and hypnosis content based on user intent.
"""
search_queries = self._generate_search_queries(user_intent)
all_results = []
for query in search_queries:
try:
await self.rate_limiter.wait_if_needed()
results = await self._execute_search(query, max_results // len(search_queries))
all_results.extend(results)
except Exception as e:
print(f"Search query failed: {e}")
continue
# Remove duplicates and sort by relevance
unique_results = self._deduplicate_results(all_results)
return await self._rank_and_filter_results(unique_results, user_intent)
def _generate_search_queries(self, user_intent: UserIntent) -> List[SearchQuery]:
"""
Generates multiple search queries based on user intent to maximize content discovery.
"""
queries = []
base_keywords = ["meditation", "hypnosis", "guided meditation", "relaxation"]
# Generate mood-specific queries
mood_keywords = self._get_mood_keywords(user_intent.primary_mood)
for mood_keyword in mood_keywords:
for base_keyword in base_keywords:
keywords = [base_keyword, mood_keyword]
# Add goal-specific keywords
for goal in user_intent.therapeutic_goals:
goal_keywords = self._get_goal_keywords(goal)
keywords.extend(goal_keywords[:2]) # Limit to avoid overly specific queries
# Determine duration range based on user preference
duration_range = self._calculate_duration_range(user_intent)
queries.append(SearchQuery(
keywords=keywords,
duration_range=duration_range,
upload_date_range=(datetime.now() - timedelta(days=365*3), datetime.now()),
minimum_rating=3.5,
content_type="meditation" if "meditation" in base_keyword else "hypnosis",
experience_level=user_intent.experience_level
))
return queries[:10] # Limit number of queries to manage API usage
def _get_mood_keywords(self, mood: MoodCategory) -> List[str]:
"""
Returns search keywords associated with specific mood categories.
"""
mood_keyword_map = {
MoodCategory.ANXIETY: ["anxiety relief", "calm anxiety", "stress relief", "peaceful"],
MoodCategory.DEPRESSION: ["depression help", "mood boost", "emotional healing", "hope"],
MoodCategory.STRESS: ["stress relief", "tension release", "calm mind", "relaxation"],
MoodCategory.HAPPINESS: ["joy meditation", "gratitude", "positive energy", "happiness"],
MoodCategory.ANGER: ["anger management", "calm anger", "emotional balance", "peace"],
MoodCategory.SADNESS: ["healing sadness", "emotional support", "comfort", "uplift"],
MoodCategory.NEUTRAL: ["general meditation", "mindfulness", "awareness", "presence"]
}
return mood_keyword_map.get(mood, ["mindfulness", "meditation"])
def _get_goal_keywords(self, goal: TherapeuticGoal) -> List[str]:
"""
Returns search keywords associated with specific therapeutic goals.
"""
goal_keyword_map = {
TherapeuticGoal.SLEEP_IMPROVEMENT: ["sleep meditation", "bedtime", "insomnia", "deep sleep"],
TherapeuticGoal.STRESS_REDUCTION: ["stress relief", "relaxation", "calm", "tension release"],
TherapeuticGoal.CONFIDENCE_BUILDING: ["confidence", "self-esteem", "courage", "empowerment"],
TherapeuticGoal.SMOKING_CESSATION: ["quit smoking", "stop smoking", "addiction", "freedom"],
TherapeuticGoal.WEIGHT_MANAGEMENT: ["weight loss", "healthy eating", "body image", "metabolism"],
TherapeuticGoal.PAIN_MANAGEMENT: ["pain relief", "healing", "chronic pain", "comfort"],
TherapeuticGoal.RELATIONSHIP_IMPROVEMENT: ["love", "relationships", "communication", "connection"],
TherapeuticGoal.FOCUS_ENHANCEMENT: ["focus", "concentration", "attention", "clarity"],
TherapeuticGoal.EMOTIONAL_HEALING: ["healing", "trauma", "emotional release", "recovery"],
TherapeuticGoal.SPIRITUAL_GROWTH: ["spiritual", "awakening", "consciousness", "enlightenment"]
}
return goal_keyword_map.get(goal, ["wellness", "healing"])
def _calculate_duration_range(self, user_intent: UserIntent) -> Tuple[int, int]:
"""
Calculates appropriate duration range based on user preferences and experience level.
"""
if user_intent.session_duration_preference:
target_duration = user_intent.session_duration_preference * 60 # convert to seconds
return (target_duration - 300, target_duration + 300) # ±5 minutes
# Default ranges based on experience level
duration_ranges = {
"beginner": (300, 1200), # 5-20 minutes
"intermediate": (600, 2400), # 10-40 minutes
"advanced": (1200, 3600) # 20-60 minutes
}
return duration_ranges.get(user_intent.experience_level, (300, 1800))
async def _execute_search(self, query: SearchQuery, max_results: int) -> List[ContentItem]:
"""
Executes a single search query against YouTube API.
"""
search_params = {
"part": "snippet",
"q": " ".join(query.keywords),
"type": "video",
"maxResults": min(max_results, 50),
"order": "relevance",
"videoDuration": self._get_duration_filter(query.duration_range),
"key": self.api_key
}
async with self.session.get(f"{self.base_url}/search", params=search_params) as response:
if response.status != 200:
raise Exception(f"YouTube API error: {response.status}")
data = await response.json()
video_ids = [item["id"]["videoId"] for item in data.get("items", [])]
if not video_ids:
return []
# Get detailed video information
return await self._get_video_details(video_ids)
def _get_duration_filter(self, duration_range: Tuple[int, int]) -> str:
"""
Converts duration range to YouTube API duration filter.
"""
min_duration, max_duration = duration_range
if max_duration <= 240: # 4 minutes
return "short"
elif max_duration <= 1200: # 20 minutes
return "medium"
else:
return "long"
async def _get_video_details(self, video_ids: List[str]) -> List[ContentItem]:
"""
Retrieves detailed information for specific video IDs.
"""
details_params = {
"part": "snippet,contentDetails,statistics",
"id": ",".join(video_ids),
"key": self.api_key
}
async with self.session.get(f"{self.base_url}/videos", params=details_params) as response:
if response.status != 200:
return []
data = await response.json()
content_items = []
for item in data.get("items", []):
try:
content_item = self._parse_video_item(item)
if content_item:
content_items.append(content_item)
except Exception as e:
print(f"Error parsing video item: {e}")
continue
return content_items
def _parse_video_item(self, item: dict) -> Optional[ContentItem]:
"""
Parses YouTube API response item into ContentItem object.
"""
try:
snippet = item["snippet"]
statistics = item.get("statistics", {})
content_details = item["contentDetails"]
# Parse duration from ISO 8601 format
duration_str = content_details["duration"]
duration_seconds = self._parse_iso_duration(duration_str)
# Calculate rating from like/dislike ratio
like_count = int(statistics.get("likeCount", 0))
view_count = int(statistics.get("viewCount", 0))
rating = min(5.0, (like_count / max(view_count, 1)) * 100) if view_count > 0 else 3.0
return ContentItem(
title=snippet["title"],
url=f"https://www.youtube.com/watch?v={item['id']}",
platform="youtube",
duration_seconds=duration_seconds,
view_count=view_count,
rating=rating,
upload_date=datetime.fromisoformat(snippet["publishedAt"].replace("Z", "+00:00")),
channel_name=snippet["channelTitle"],
description=snippet.get("description", ""),
thumbnail_url=snippet["thumbnails"]["high"]["url"],
tags=snippet.get("tags", []),
category=snippet.get("categoryId", ""),
language=snippet.get("defaultLanguage", "en")
)
except (KeyError, ValueError) as e:
print(f"Error parsing video item: {e}")
return None
def _parse_iso_duration(self, duration_str: str) -> int:
"""
Parses ISO 8601 duration format (PT15M33S) to seconds.
"""
import re
pattern = r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?'
match = re.match(pattern, duration_str)
if not match:
return 0
hours = int(match.group(1) or 0)
minutes = int(match.group(2) or 0)
seconds = int(match.group(3) or 0)
return hours * 3600 + minutes * 60 + seconds
def _deduplicate_results(self, results: List[ContentItem]) -> List[ContentItem]:
"""
Removes duplicate content items based on title similarity and URL.
"""
seen_urls = set()
unique_results = []
for item in results:
if item.url not in seen_urls:
seen_urls.add(item.url)
unique_results.append(item)
return unique_results
async def _rank_and_filter_results(self, results: List[ContentItem], user_intent: UserIntent) -> List[ContentItem]:
"""
Ranks and filters results based on relevance to user intent.
"""
scored_results = []
for item in results:
relevance_score = await self._calculate_relevance_score(item, user_intent)
if relevance_score > 0.3: # Minimum relevance threshold
scored_results.append((item, relevance_score))
# Sort by relevance score (descending)
scored_results.sort(key=lambda x: x[1], reverse=True)
return [item for item, score in scored_results]
async def _calculate_relevance_score(self, item: ContentItem, user_intent: UserIntent) -> float:
"""
Calculates relevance score for a content item based on user intent.
"""
score = 0.0
# Title and description keyword matching
text_content = f"{item.title} {item.description}".lower()
# Mood keyword matching
mood_keywords = self._get_mood_keywords(user_intent.primary_mood)
mood_matches = sum(1 for keyword in mood_keywords if keyword.lower() in text_content)
score += (mood_matches / len(mood_keywords)) * 0.3
# Goal keyword matching
for goal in user_intent.therapeutic_goals:
goal_keywords = self._get_goal_keywords(goal)
goal_matches = sum(1 for keyword in goal_keywords if keyword.lower() in text_content)
score += (goal_matches / len(goal_keywords)) * 0.2
# Duration preference matching
if user_intent.session_duration_preference:
target_duration = user_intent.session_duration_preference * 60
duration_diff = abs(item.duration_seconds - target_duration)
duration_score = max(0, 1 - (duration_diff / target_duration))
score += duration_score * 0.2
# Quality indicators
if item.view_count > 1000:
score += 0.1
if item.rating > 4.0:
score += 0.1
# Channel reputation (simplified)
trusted_channels = ["jason stephenson", "michael sealey", "the honest guys", "michelle's sanctuary"]
if any(channel in item.channel_name.lower() for channel in trusted_channels):
score += 0.1
return min(1.0, score)
class RateLimiter:
def __init__(self, calls_per_minute: int):
self.calls_per_minute = calls_per_minute
self.calls = []
async def wait_if_needed(self):
now = datetime.now()
# Remove calls older than 1 minute
self.calls = [call_time for call_time in self.calls if (now - call_time).seconds < 60]
if len(self.calls) >= self.calls_per_minute:
sleep_time = 60 - (now - self.calls[0]).seconds
await asyncio.sleep(sleep_time)
self.calls.append(now)
The content discovery system represents a sophisticated search and retrieval mechanism that transforms user intents into targeted content queries across multiple platforms. The system employs a multi-query strategy, generating several complementary search queries to maximize content discovery while avoiding overly narrow results that might miss relevant content.
The YouTube integration component handles the complexities of the YouTube Data API, including rate limiting, pagination, and data parsing. The system constructs search queries by combining mood-specific keywords with therapeutic goal indicators, creating comprehensive search terms that capture the nuanced requirements expressed in user inputs.
Quality assessment plays a crucial role in content filtering, with the system evaluating multiple factors including view counts, engagement metrics, channel reputation, and content freshness. This multi-dimensional scoring approach ensures that recommended content meets both relevance and quality standards.
Recommendation Engine and Content Ranking
The recommendation engine serves as the intelligent core that transforms discovered content into personalized suggestions. This component employs machine learning techniques and heuristic algorithms to score and rank content based on multiple relevance factors, ensuring that the final recommendations closely align with user needs and preferences.
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from typing import Dict, List, Tuple
import pickle
import os
@dataclass
class RecommendationScore:
content_item: ContentItem
overall_score: float
mood_relevance: float
goal_relevance: float
quality_score: float
personalization_score: float
explanation: str
class ContentRecommendationEngine:
def __init__(self):
self.tfidf_vectorizer = TfidfVectorizer(
max_features=1000,
stop_words='english',
ngram_range=(1, 2)
)
self.content_embeddings = {}
self.user_preference_model = None
self.content_quality_model = None
self._initialize_models()
def _initialize_models(self):
"""
Initializes machine learning models for content analysis and user preference modeling.
"""
# Load pre-trained models if available
if os.path.exists("models/user_preference_model.pkl"):
with open("models/user_preference_model.pkl", "rb") as f:
self.user_preference_model = pickle.load(f)
if os.path.exists("models/content_quality_model.pkl"):
with open("models/content_quality_model.pkl", "rb") as f:
self.content_quality_model = pickle.load(f)
async def generate_recommendations(
self,
content_items: List[ContentItem],
user_intent: UserIntent,
user_history: Optional[List[ContentItem]] = None,
num_recommendations: int = 10
) -> List[RecommendationScore]:
"""
Generates personalized content recommendations based on user intent and content analysis.
"""
if not content_items:
return []
# Calculate various scoring components
mood_scores = await self._calculate_mood_relevance_scores(content_items, user_intent)
goal_scores = await self._calculate_goal_relevance_scores(content_items, user_intent)
quality_scores = await self._calculate_quality_scores(content_items)
personalization_scores = await self._calculate_personalization_scores(
content_items, user_intent, user_history
)
# Combine scores with weighted importance
recommendation_scores = []
for i, item in enumerate(content_items):
overall_score = self._combine_scores(
mood_scores[i],
goal_scores[i],
quality_scores[i],
personalization_scores[i],
user_intent
)
explanation = self._generate_explanation(
item, mood_scores[i], goal_scores[i],
quality_scores[i], personalization_scores[i]
)
recommendation_scores.append(RecommendationScore(
content_item=item,
overall_score=overall_score,
mood_relevance=mood_scores[i],
goal_relevance=goal_scores[i],
quality_score=quality_scores[i],
personalization_score=personalization_scores[i],
explanation=explanation
))
# Sort by overall score and return top recommendations
recommendation_scores.sort(key=lambda x: x.overall_score, reverse=True)
return recommendation_scores[:num_recommendations]
async def _calculate_mood_relevance_scores(
self,
content_items: List[ContentItem],
user_intent: UserIntent
) -> List[float]:
"""
Calculates mood relevance scores using semantic similarity analysis.
"""
mood_descriptors = self._get_mood_descriptors(user_intent.primary_mood)
secondary_mood_descriptors = []
for mood in user_intent.secondary_moods:
secondary_mood_descriptors.extend(self._get_mood_descriptors(mood))
all_mood_text = " ".join(mood_descriptors + secondary_mood_descriptors)
scores = []
for item in content_items:
content_text = f"{item.title} {item.description}"
# Use TF-IDF similarity for semantic matching
combined_texts = [all_mood_text, content_text]
tfidf_matrix = self.tfidf_vectorizer.fit_transform(combined_texts)
similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]
# Adjust score based on mood intensity
intensity_multiplier = user_intent.intensity_level / 10.0
adjusted_score = similarity * (0.5 + 0.5 * intensity_multiplier)
scores.append(min(1.0, adjusted_score))
return scores
def _get_mood_descriptors(self, mood: MoodCategory) -> List[str]:
"""
Returns detailed descriptive text for mood categories to improve semantic matching.
"""
mood_descriptors = {
MoodCategory.ANXIETY: [
"anxious worried nervous tense restless fearful panic overwhelmed",
"stress tension pressure unease discomfort agitation nervousness",
"calm peaceful relaxed serene tranquil soothing gentle"
],
MoodCategory.DEPRESSION: [
"depressed sad down hopeless empty numb worthless lonely",
"melancholy despair grief sorrow dejection despondency",
"uplifting hopeful encouraging positive healing comfort"
],
MoodCategory.STRESS: [
"stressed overwhelmed pressure burned exhausted frazzled",
"tension strain burden overload fatigue weariness",
"relaxation relief calm peace restoration rejuvenation"
],
MoodCategory.HAPPINESS: [
"happy joyful content pleased cheerful optimistic grateful",
"bliss delight satisfaction fulfillment elation euphoria",
"maintain enhance amplify celebrate appreciate gratitude"
],
MoodCategory.ANGER: [
"angry frustrated irritated mad furious annoyed resentful",
"rage hostility aggression indignation wrath fury",
"calm peaceful forgiveness understanding patience tolerance"
]
}
return mood_descriptors.get(mood, ["neutral balanced centered"])
async def _calculate_goal_relevance_scores(
self,
content_items: List[ContentItem],
user_intent: UserIntent
) -> List[float]:
"""
Calculates therapeutic goal relevance scores using specialized keyword analysis.
"""
goal_keywords = {}
for goal in user_intent.therapeutic_goals:
goal_keywords[goal] = self._get_detailed_goal_keywords(goal)
scores = []
for item in content_items:
content_text = f"{item.title} {item.description} {' '.join(item.tags)}".lower()
goal_score = 0.0
for goal, keywords in goal_keywords.items():
keyword_matches = sum(1 for keyword in keywords if keyword in content_text)
goal_relevance = keyword_matches / len(keywords)
goal_score += goal_relevance
# Normalize by number of goals
if user_intent.therapeutic_goals:
goal_score /= len(user_intent.therapeutic_goals)
scores.append(min(1.0, goal_score))
return scores
def _get_detailed_goal_keywords(self, goal: TherapeuticGoal) -> List[str]:
"""
Returns comprehensive keyword lists for therapeutic goals.
"""
detailed_keywords = {
TherapeuticGoal.SLEEP_IMPROVEMENT: [
"sleep", "insomnia", "bedtime", "falling asleep", "staying asleep",
"deep sleep", "rem sleep", "sleep cycle", "nighttime", "rest",
"slumber", "drowsy", "sleepy", "tired", "fatigue", "exhaustion"
],
TherapeuticGoal.STRESS_REDUCTION: [
"stress", "tension", "pressure", "overwhelm", "anxiety",
"relaxation", "calm", "peace", "tranquil", "serene",
"unwind", "decompress", "release", "relief", "soothing"
],
TherapeuticGoal.CONFIDENCE_BUILDING: [
"confidence", "self-esteem", "self-worth", "courage", "brave",
"assertiveness", "empowerment", "strength", "bold", "fearless",
"self-belief", "inner strength", "personal power", "leadership"
],
TherapeuticGoal.SMOKING_CESSATION: [
"smoking", "cigarettes", "tobacco", "nicotine", "addiction",
"quit", "stop", "cessation", "freedom", "liberation",
"healthy lungs", "clean air", "smoke-free", "withdrawal"
]
}
return detailed_keywords.get(goal, [])
async def _calculate_quality_scores(self, content_items: List[ContentItem]) -> List[float]:
"""
Calculates content quality scores based on multiple quality indicators.
"""
scores = []
# Calculate percentiles for normalization
view_counts = [item.view_count for item in content_items]
ratings = [item.rating for item in content_items]
view_count_percentiles = np.percentile(view_counts, [25, 50, 75, 90])
rating_percentiles = np.percentile(ratings, [25, 50, 75, 90])
for item in content_items:
quality_score = 0.0
# View count score (30% weight)
view_score = self._calculate_percentile_score(item.view_count, view_count_percentiles)
quality_score += view_score * 0.3
# Rating score (25% weight)
rating_score = self._calculate_percentile_score(item.rating, rating_percentiles)
quality_score += rating_score * 0.25
# Channel reputation score (20% weight)
channel_score = self._calculate_channel_reputation_score(item.channel_name)
quality_score += channel_score * 0.2
# Content freshness score (15% weight)
freshness_score = self._calculate_freshness_score(item.upload_date)
quality_score += freshness_score * 0.15
# Duration appropriateness score (10% weight)
duration_score = self._calculate_duration_appropriateness_score(item.duration_seconds)
quality_score += duration_score * 0.1
scores.append(min(1.0, quality_score))
return scores
def _calculate_percentile_score(self, value: float, percentiles: List[float]) -> float:
"""
Converts a value to a percentile-based score.
"""
if value >= percentiles[3]: # 90th percentile
return 1.0
elif value >= percentiles[2]: # 75th percentile
return 0.8
elif value >= percentiles[1]: # 50th percentile
return 0.6
elif value >= percentiles[0]: # 25th percentile
return 0.4
else:
return 0.2
def _calculate_channel_reputation_score(self, channel_name: str) -> float:
"""
Calculates channel reputation score based on known quality creators.
"""
# High-reputation meditation/hypnosis channels
tier_1_channels = [
"jason stephenson", "michael sealey", "the honest guys",
"michelle's sanctuary", "lauren ostrowski fenton", "thomas hall"
]
tier_2_channels = [
"meditation relax music", "yellow brick cinema", "soothing relaxation",
"new horizon meditation", "great meditation", "meditative mind"
]
channel_lower = channel_name.lower()
if any(channel in channel_lower for channel in tier_1_channels):
return 1.0
elif any(channel in channel_lower for channel in tier_2_channels):
return 0.7
else:
return 0.5 # Neutral score for unknown channels
def _calculate_freshness_score(self, upload_date: datetime) -> float:
"""
Calculates content freshness score based on upload date.
"""
days_old = (datetime.now() - upload_date).days
if days_old <= 30: # Very fresh
return 1.0
elif days_old <= 180: # Recent
return 0.8
elif days_old <= 365: # Moderately old
return 0.6
elif days_old <= 730: # Old but acceptable
return 0.4
else: # Very old
return 0.2
def _calculate_duration_appropriateness_score(self, duration_seconds: int) -> float:
"""
Calculates score based on duration appropriateness for meditation/hypnosis.
"""
duration_minutes = duration_seconds / 60
# Optimal ranges for different types of sessions
if 5 <= duration_minutes <= 60: # Ideal range
return 1.0
elif 3 <= duration_minutes < 5 or 60 < duration_minutes <= 90: # Acceptable
return 0.7
elif 1 <= duration_minutes < 3 or 90 < duration_minutes <= 120: # Marginal
return 0.4
else: # Too short or too long
return 0.2
async def _calculate_personalization_scores(
self,
content_items: List[ContentItem],
user_intent: UserIntent,
user_history: Optional[List[ContentItem]] = None
) -> List[float]:
"""
Calculates personalization scores based on user preferences and history.
"""
if not user_history:
# Return neutral scores if no history available
return [0.5] * len(content_items)
scores = []
# Analyze user preferences from history
preferred_channels = self._extract_preferred_channels(user_history)
preferred_duration_range = self._extract_preferred_duration(user_history)
preferred_content_types = self._extract_preferred_content_types(user_history)
for item in content_items:
personalization_score = 0.0
# Channel preference matching
if item.channel_name in preferred_channels:
personalization_score += 0.4
# Duration preference matching
if self._duration_matches_preference(item.duration_seconds, preferred_duration_range):
personalization_score += 0.3
# Content type preference matching
content_type = self._classify_content_type(item)
if content_type in preferred_content_types:
personalization_score += 0.3
scores.append(min(1.0, personalization_score))
return scores
def _extract_preferred_channels(self, user_history: List[ContentItem]) -> Dict[str, float]:
"""
Extracts user's preferred channels from viewing history.
"""
channel_counts = {}
for item in user_history:
channel_counts[item.channel_name] = channel_counts.get(item.channel_name, 0) + 1
total_views = len(user_history)
return {channel: count/total_views for channel, count in channel_counts.items()}
def _extract_preferred_duration(self, user_history: List[ContentItem]) -> Tuple[int, int]:
"""
Extracts user's preferred session duration range from history.
"""
durations = [item.duration_seconds for item in user_history]
avg_duration = sum(durations) / len(durations)
std_duration = np.std(durations)
return (int(avg_duration - std_duration), int(avg_duration + std_duration))
def _extract_preferred_content_types(self, user_history: List[ContentItem]) -> List[str]:
"""
Extracts user's preferred content types from history.
"""
content_types = [self._classify_content_type(item) for item in user_history]
type_counts = {}
for content_type in content_types:
type_counts[content_type] = type_counts.get(content_type, 0) + 1
# Return types that appear in more than 20% of history
threshold = len(user_history) * 0.2
return [content_type for content_type, count in type_counts.items() if count >= threshold]
def _classify_content_type(self, item: ContentItem) -> str:
"""
Classifies content into categories based on title and description.
"""
text = f"{item.title} {item.description}".lower()
if any(keyword in text for keyword in ["hypnosis", "hypnotic", "trance"]):
return "hypnosis"
elif any(keyword in text for keyword in ["guided meditation", "guided"]):
return "guided_meditation"
elif any(keyword in text for keyword in ["music", "sounds", "ambient"]):
return "meditation_music"
elif any(keyword in text for keyword in ["mindfulness", "awareness"]):
return "mindfulness"
elif any(keyword in text for keyword in ["visualization", "imagery"]):
return "visualization"
else:
return "general_meditation"
def _duration_matches_preference(self, duration: int, preferred_range: Tuple[int, int]) -> bool:
"""
Checks if content duration matches user's preferred range.
"""
min_duration, max_duration = preferred_range
return min_duration <= duration <= max_duration
def _combine_scores(
self,
mood_score: float,
goal_score: float,
quality_score: float,
personalization_score: float,
user_intent: UserIntent
) -> float:
"""
Combines individual scores into overall recommendation score with dynamic weighting.
"""
# Base weights
mood_weight = 0.35
goal_weight = 0.25
quality_weight = 0.25
personalization_weight = 0.15
# Adjust weights based on user intent intensity
if user_intent.intensity_level >= 8: # High intensity - prioritize mood matching
mood_weight = 0.45
goal_weight = 0.30
quality_weight = 0.15
personalization_weight = 0.10
elif user_intent.intensity_level <= 3: # Low intensity - prioritize quality and personalization
mood_weight = 0.25
goal_weight = 0.20
quality_weight = 0.35
personalization_weight = 0.20
overall_score = (
mood_score * mood_weight +
goal_score * goal_weight +
quality_score * quality_weight +
personalization_score * personalization_weight
)
return min(1.0, overall_score)
def _generate_explanation(
self,
item: ContentItem,
mood_score: float,
goal_score: float,
quality_score: float,
personalization_score: float
) -> str:
"""
Generates human-readable explanation for why content was recommended.
"""
explanations = []
if mood_score >= 0.7:
explanations.append("excellent match for your current mood")
elif mood_score >= 0.5:
explanations.append("good alignment with your emotional state")
if goal_score >= 0.7:
explanations.append("highly relevant to your therapeutic goals")
elif goal_score >= 0.5:
explanations.append("addresses your stated objectives")
if quality_score >= 0.8:
explanations.append("high-quality content from reputable source")
elif quality_score >= 0.6:
explanations.append("well-rated content with good engagement")
if personalization_score >= 0.7:
explanations.append("matches your personal preferences")
if not explanations:
explanations.append("balanced recommendation across multiple factors")
return f"Recommended because it offers {', '.join(explanations)}."
The recommendation engine employs a sophisticated multi-factor scoring system that considers mood relevance, therapeutic goal alignment, content quality, and personalization factors. The system uses machine learning techniques including TF-IDF vectorization and cosine similarity to perform semantic matching between user intents and content descriptions.
The engine dynamically adjusts scoring weights based on user intent intensity, prioritizing mood matching for high-intensity emotional states while emphasizing quality and personalization for more general requests. This adaptive approach ensures that recommendations remain relevant across different user contexts and emotional urgency levels.
Conversational Interface and User Experience
The conversational interface serves as the primary touchpoint between users and the system, managing the flow of interaction from initial intent capture through final recommendation presentation. This component handles natural language understanding, context maintenance, and response generation to create a seamless and intuitive user experience.
from enum import Enum
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import json
import asyncio
class ConversationState(Enum):
INITIAL_GREETING = "initial_greeting"
INTENT_GATHERING = "intent_gathering"
INTENT_CLARIFICATION = "intent_clarification"
CONTENT_SEARCHING = "content_searching"
RECOMMENDATION_PRESENTATION = "recommendation_presentation"
FOLLOW_UP = "follow_up"
FEEDBACK_COLLECTION = "feedback_collection"
@dataclass
class ConversationContext:
user_id: str
session_id: str
current_state: ConversationState
user_intent: Optional[UserIntent]
conversation_history: List[Dict[str, str]]
recommendations: List[RecommendationScore]
user_preferences: Dict[str, Any]
clarification_needed: List[str]
last_interaction_time: datetime
class TherapeuticContentChatbot:
def __init__(
self,
openai_api_key: str,
youtube_api_key: str,
database_connection: Optional[Any] = None
):
self.intent_analyzer = IntentAnalysisEngine(openai_api_key)
self.content_discovery = YouTubeContentDiscovery(youtube_api_key)
self.recommendation_engine = ContentRecommendationEngine()
self.conversation_contexts = {}
self.database = database_connection
self.response_templates = self._initialize_response_templates()
def _initialize_response_templates(self) -> Dict[str, List[str]]:
"""
Initializes response templates for different conversation states and scenarios.
"""
return {
"greeting": [
"Hello! I'm here to help you find the perfect meditation or hypnosis session for your current needs. How are you feeling today, and what would you like to work on?",
"Welcome! I specialize in finding personalized meditation and hypnosis content. Could you tell me about your current mood and what you're hoping to achieve?",
"Hi there! I'm your therapeutic content assistant. What's on your mind today, and how can I help you find the right session for your needs?"
],
"intent_clarification": [
"I want to make sure I find the best content for you. Could you tell me a bit more about {clarification_topic}?",
"To give you the most relevant recommendations, I'd like to understand {clarification_topic} better. Can you elaborate?",
"Help me personalize your recommendations by sharing more details about {clarification_topic}."
],
"searching": [
"I'm searching for content that matches your needs. This may take a moment while I explore multiple platforms...",
"Let me find the perfect sessions for you. I'm checking YouTube and other platforms for relevant content...",
"Searching for personalized recommendations based on your mood and goals. Please wait a moment..."
],
"no_results": [
"I couldn't find content that perfectly matches your specific needs. Would you like me to search with broader criteria or try different keywords?",
"It seems there's limited content for your exact requirements. Shall I expand the search or suggest alternative approaches?",
"I'm having trouble finding exact matches. Would you like me to recommend similar content or adjust the search parameters?"
],
"recommendations_intro": [
"I found {count} excellent sessions that match your needs. Here are my top recommendations:",
"Based on your mood and goals, I've curated {count} personalized recommendations for you:",
"I've discovered {count} sessions that should be perfect for your current situation:"
]
}
async def process_user_message(self, user_id: str, message: str) -> str:
"""
Main entry point for processing user messages and generating appropriate responses.
"""
# Get or create conversation context
context = self._get_or_create_context(user_id, message)
try:
# Update conversation history
context.conversation_history.append({
"timestamp": datetime.now().isoformat(),
"role": "user",
"content": message
})
# Process message based on current conversation state
response = await self._process_message_by_state(context, message)
# Update conversation history with bot response
context.conversation_history.append({
"timestamp": datetime.now().isoformat(),
"role": "assistant",
"content": response
})
# Save context to database if available
if self.database:
await self._save_conversation_context(context)
return response
except Exception as e:
error_response = "I apologize, but I encountered an issue while processing your request. Could you please try rephrasing your message or let me know if you need assistance with something specific?"
context.conversation_history.append({
"timestamp": datetime.now().isoformat(),
"role": "assistant",
"content": error_response
})
return error_response
def _get_or_create_context(self, user_id: str, message: str) -> ConversationContext:
"""
Retrieves existing conversation context or creates a new one for the user.
"""
if user_id not in self.conversation_contexts:
self.conversation_contexts[user_id] = ConversationContext(
user_id=user_id,
session_id=f"{user_id}_{datetime.now().timestamp()}",
current_state=ConversationState.INITIAL_GREETING,
user_intent=None,
conversation_history=[],
recommendations=[],
user_preferences={},
clarification_needed=[],
last_interaction_time=datetime.now()
)
else:
# Check if session has expired (more than 30 minutes of inactivity)
context = self.conversation_contexts[user_id]
time_since_last_interaction = datetime.now() - context.last_interaction_time
if time_since_last_interaction.total_seconds() > 1800: # 30 minutes
# Reset to initial state for new session
context.current_state = ConversationState.INITIAL_GREETING
context.user_intent = None
context.recommendations = []
context.clarification_needed = []
self.conversation_contexts[user_id].last_interaction_time = datetime.now()
return self.conversation_contexts[user_id]
async def _process_message_by_state(self, context: ConversationContext, message: str) -> str:
"""
Routes message processing based on current conversation state.
"""
if context.current_state == ConversationState.INITIAL_GREETING:
return await self._handle_initial_greeting(context, message)
elif context.current_state == ConversationState.INTENT_GATHERING:
return await self._handle_intent_gathering(context, message)
elif context.current_state == ConversationState.INTENT_CLARIFICATION:
return await self._handle_intent_clarification(context, message)
elif context.current_state == ConversationState.RECOMMENDATION_PRESENTATION:
return await self._handle_recommendation_interaction(context, message)
elif context.current_state == ConversationState.FOLLOW_UP:
return await self._handle_follow_up(context, message)
elif context.current_state == ConversationState.FEEDBACK_COLLECTION:
return await self._handle_feedback_collection(context, message)
else:
return await self._handle_intent_gathering(context, message)
async def _handle_initial_greeting(self, context: ConversationContext, message: str) -> str:
"""
Handles initial user interaction and greeting.
"""
# Check if user provided intent information in first message
if len(message.split()) > 3: # Substantial message likely contains intent
context.current_state = ConversationState.INTENT_GATHERING
return await self._handle_intent_gathering(context, message)
else:
# Simple greeting or short message
context.current_state = ConversationState.INTENT_GATHERING
return self._get_random_template("greeting")
async def _handle_intent_gathering(self, context: ConversationContext, message: str) -> str:
"""
Processes user input to extract therapeutic intent and determines if clarification is needed.
"""
# Analyze user intent
user_intent = await self.intent_analyzer.analyze_user_intent(message)
context.user_intent = user_intent
# Determine if clarification is needed
clarification_needed = self._assess_clarification_needs(user_intent)
if clarification_needed:
context.clarification_needed = clarification_needed
context.current_state = ConversationState.INTENT_CLARIFICATION
return self._generate_clarification_request(clarification_needed[0])
else:
# Proceed with content search
context.current_state = ConversationState.CONTENT_SEARCHING
return await self._perform_content_search_and_recommend(context)
async def _handle_intent_clarification(self, context: ConversationContext, message: str) -> str:
"""
Handles clarification responses and updates user intent accordingly.
"""
# Process clarification response
await self._process_clarification_response(context, message)
# Remove processed clarification from queue
if context.clarification_needed:
context.clarification_needed.pop(0)
# Check if more clarification is needed
if context.clarification_needed:
return self._generate_clarification_request(context.clarification_needed[0])
else:
# Proceed with content search
context.current_state = ConversationState.CONTENT_SEARCHING
return await self._perform_content_search_and_recommend(context)
async def _handle_recommendation_interaction(self, context: ConversationContext, message: str) -> str:
"""
Handles user interactions with presented recommendations.
"""
message_lower = message.lower()
# Check for specific requests
if any(word in message_lower for word in ["more", "other", "different", "additional"]):
return await self._provide_additional_recommendations(context)
elif any(word in message_lower for word in ["explain", "why", "reason"]):
return self._provide_recommendation_explanations(context)
elif any(word in message_lower for word in ["shorter", "longer", "duration"]):
return await self._filter_by_duration(context, message)
elif any(word in message_lower for word in ["thank", "thanks", "perfect", "great"]):
context.current_state = ConversationState.FEEDBACK_COLLECTION
return "I'm glad I could help! Would you mind sharing which recommendation you found most helpful? Your feedback helps me improve future suggestions."
else:
# Try to extract new intent for additional search
new_intent = await self.intent_analyzer.analyze_user_intent(message)
if self._is_new_search_request(new_intent, context.user_intent):
context.user_intent = new_intent
context.current_state = ConversationState.CONTENT_SEARCHING
return await self._perform_content_search_and_recommend(context)
else:
return "I'm here to help with your meditation and hypnosis needs. You can ask for more recommendations, request different durations, or start a new search by telling me about your current mood and goals."
async def _handle_follow_up(self, context: ConversationContext, message: str) -> str:
"""
Handles follow-up interactions and additional requests.
"""
# Similar to recommendation interaction but with different context
return await self._handle_recommendation_interaction(context, message)
async def _handle_feedback_collection(self, context: ConversationContext, message: str) -> str:
"""
Collects and processes user feedback on recommendations.
"""
# Store feedback for future improvement
await self._store_user_feedback(context, message)
context.current_state = ConversationState.FOLLOW_UP
return "Thank you for your feedback! Is there anything else I can help you with today? I can search for different types of content or assist with other therapeutic goals."
def _assess_clarification_needs(self, user_intent: UserIntent) -> List[str]:
"""
Determines what clarification is needed based on extracted user intent.
"""
clarifications = []
# Check if mood is too vague or neutral
if user_intent.primary_mood == MoodCategory.NEUTRAL and not user_intent.secondary_moods:
clarifications.append("your current emotional state or mood")
# Check if no therapeutic goals were identified
if not user_intent.therapeutic_goals:
clarifications.append("what you're hoping to achieve or work on")
# Check if intensity is unclear for strong emotional states
if user_intent.primary_mood in [MoodCategory.ANXIETY, MoodCategory.DEPRESSION] and user_intent.intensity_level == 5:
clarifications.append("how intense these feelings are for you right now")
# Check if experience level might affect recommendations
if user_intent.experience_level == "beginner" and not any(word in user_intent.raw_input.lower() for word in ["new", "beginner", "first time", "never"]):
clarifications.append("your experience level with meditation or hypnosis")
return clarifications
def _generate_clarification_request(self, clarification_topic: str) -> str:
"""
Generates a natural clarification request for the specified topic.
"""
template = self._get_random_template("intent_clarification")
return template.format(clarification_topic=clarification_topic)
async def _process_clarification_response(self, context: ConversationContext, message: str) -> None:
"""
Processes clarification response and updates user intent.
"""
# Re-analyze the clarification message and merge with existing intent
clarification_intent = await self.intent_analyzer.analyze_user_intent(message)
# Merge new information with existing intent
if context.user_intent:
# Update primary mood if new one is more specific
if clarification_intent.primary_mood != MoodCategory.NEUTRAL:
context.user_intent.primary_mood = clarification_intent.primary_mood
# Add new therapeutic goals
for goal in clarification_intent.therapeutic_goals:
if goal not in context.user_intent.therapeutic_goals:
context.user_intent.therapeutic_goals.append(goal)
# Update intensity if provided
if clarification_intent.intensity_level != 5:
context.user_intent.intensity_level = clarification_intent.intensity_level
# Update experience level if provided
if clarification_intent.experience_level != "beginner":
context.user_intent.experience_level = clarification_intent.experience_level
# Update duration preference if provided
if clarification_intent.session_duration_preference:
context.user_intent.session_duration_preference = clarification_intent.session_duration_preference
async def _perform_content_search_and_recommend(self, context: ConversationContext) -> str:
"""
Performs content search and generates recommendations.
"""
# Show searching message
searching_message = self._get_random_template("searching")
try:
# Search for content
async with self.content_discovery as discovery:
content_items = await discovery.search_content(context.user_intent, max_results=50)
if not content_items:
context.current_state = ConversationState.FOLLOW_UP
return self._get_random_template("no_results")
# Generate recommendations
user_history = await self._get_user_history(context.user_id) if self.database else None
recommendations = await self.recommendation_engine.generate_recommendations(
content_items, context.user_intent, user_history, num_recommendations=10
)
context.recommendations = recommendations
context.current_state = ConversationState.RECOMMENDATION_PRESENTATION
return self._format_recommendations_response(recommendations)
except Exception as e:
context.current_state = ConversationState.FOLLOW_UP
return "I encountered an issue while searching for content. Please try again or let me know if you'd like to adjust your search criteria."
def _format_recommendations_response(self, recommendations: List[RecommendationScore]) -> str:
"""
Formats recommendations into a user-friendly response.
"""
if not recommendations:
return self._get_random_template("no_results")
intro = self._get_random_template("recommendations_intro").format(count=len(recommendations))
response_parts = [intro, ""]
for i, rec in enumerate(recommendations, 1):
item = rec.content_item
duration_minutes = item.duration_seconds // 60
recommendation_text = f"{i}. **{item.title}**\n"
recommendation_text += f" Channel: {item.channel_name}\n"
recommendation_text += f" Duration: {duration_minutes} minutes\n"
recommendation_text += f" URL: {item.url}\n"
recommendation_text += f" Why recommended: {rec.explanation}\n"
response_parts.append(recommendation_text)
response_parts.append("\nWould you like more recommendations, different durations, or have any other questions?")
return "\n".join(response_parts)
async def _provide_additional_recommendations(self, context: ConversationContext) -> str:
"""
Provides additional recommendations beyond the initial set.
"""
# Search for more content with slightly relaxed criteria
try:
async with self.content_discovery as discovery:
additional_content = await discovery.search_content(context.user_intent, max_results=30)
# Filter out already recommended content
recommended_urls = {rec.content_item.url for rec in context.recommendations}
new_content = [item for item in additional_content if item.url not in recommended_urls]
if new_content:
user_history = await self._get_user_history(context.user_id) if self.database else None
additional_recommendations = await self.recommendation_engine.generate_recommendations(
new_content, context.user_intent, user_history, num_recommendations=5
)
context.recommendations.extend(additional_recommendations)
return self._format_recommendations_response(additional_recommendations)
else:
return "I've already shown you the best available content for your needs. Would you like me to search with different criteria or help you with something else?"
except Exception as e:
return "I'm having trouble finding additional content right now. Would you like to try a new search or adjust your preferences?"
def _provide_recommendation_explanations(self, context: ConversationContext) -> str:
"""
Provides detailed explanations for why specific content was recommended.
"""
if not context.recommendations:
return "I don't have any current recommendations to explain. Would you like me to search for content first?"
explanations = ["Here's why I recommended each session:\n"]
for i, rec in enumerate(context.recommendations[:5], 1): # Limit to top 5 for readability
explanation = f"{i}. **{rec.content_item.title}**\n"
explanation += f" Mood relevance: {rec.mood_relevance:.1%}\n"
explanation += f" Goal alignment: {rec.goal_relevance:.1%}\n"
explanation += f" Quality score: {rec.quality_score:.1%}\n"
explanation += f" {rec.explanation}\n"
explanations.append(explanation)
return "\n".join(explanations)
async def _filter_by_duration(self, context: ConversationContext, message: str) -> str:
"""
Filters current recommendations by duration preferences mentioned in message.
"""
# Extract duration preference from message
duration_keywords = {
"short": (0, 900), # 0-15 minutes
"medium": (900, 1800), # 15-30 minutes
"long": (1800, 3600), # 30-60 minutes
"quick": (0, 600), # 0-10 minutes
"brief": (0, 600), # 0-10 minutes
"extended": (1800, 3600) # 30-60 minutes
}
message_lower = message.lower()
duration_range = None
for keyword, range_tuple in duration_keywords.items():
if keyword in message_lower:
duration_range = range_tuple
break
# Look for specific minute mentions
import re
minute_match = re.search(r'(\d+)\s*(?:minute|min)', message_lower)
if minute_match:
target_minutes = int(minute_match.group(1))
duration_range = (target_minutes * 60 - 300, target_minutes * 60 + 300) # ±5 minutes
if duration_range:
min_duration, max_duration = duration_range
filtered_recommendations = [
rec for rec in context.recommendations
if min_duration <= rec.content_item.duration_seconds <= max_duration
]
if filtered_recommendations:
return self._format_recommendations_response(filtered_recommendations)
else:
return f"I don't have recommendations in that duration range. Would you like me to search for content with your preferred duration?"
else:
return "I didn't understand the duration preference. Could you specify if you want short (under 15 min), medium (15-30 min), or long (30+ min) sessions?"
def _is_new_search_request(self, new_intent: UserIntent, current_intent: Optional[UserIntent]) -> bool:
"""
Determines if the user is requesting a new search based on intent comparison.
"""
if not current_intent:
return True
# Check for significant changes in mood or goals
mood_changed = new_intent.primary_mood != current_intent.primary_mood
goals_changed = set(new_intent.therapeutic_goals) != set(current_intent.therapeutic_goals)
intensity_changed = abs(new_intent.intensity_level - current_intent.intensity_level) > 2
return mood_changed or goals_changed or intensity_changed
def _get_random_template(self, template_type: str) -> str:
"""
Returns a random template from the specified type.
"""
import random
templates = self.response_templates.get(template_type, ["I'm here to help you find therapeutic content."])
return random.choice(templates)
async def _get_user_history(self, user_id: str) -> Optional[List[ContentItem]]:
"""
Retrieves user's content viewing history from database.
"""
if not self.database:
return None
# Database query implementation would go here
# This is a placeholder for the actual database integration
return None
async def _save_conversation_context(self, context: ConversationContext) -> None:
"""
Saves conversation context to database for persistence.
"""
if not self.database:
return
# Database save implementation would go here
# This is a placeholder for the actual database integration
pass
async def _store_user_feedback(self, context: ConversationContext, feedback: str) -> None:
"""
Stores user feedback for system improvement.
"""
feedback_data = {
"user_id": context.user_id,
"session_id": context.session_id,
"feedback": feedback,
"recommendations": [rec.content_item.url for rec in context.recommendations],
"user_intent": context.user_intent.__dict__ if context.user_intent else None,
"timestamp": datetime.now().isoformat()
}
if self.database:
# Store in database
pass
else:
# Log to file or other storage mechanism
print(f"User feedback collected: {json.dumps(feedback_data, indent=2)}")
The conversational interface manages complex dialogue flows through a state machine architecture that tracks conversation progress and maintains context across multiple interactions. The system employs sophisticated natural language understanding to interpret user responses and determine appropriate follow-up actions.
The interface supports various interaction patterns including clarification requests, recommendation refinement, and iterative search improvement. Users can request additional recommendations, filter by specific criteria, or start entirely new searches without losing conversational context.
Complete System Integration and Running Example
The following complete example demonstrates the integration of all system components into a functional therapeutic content discovery chatbot. This implementation showcases the end-to-end workflow from user input processing through content discovery and recommendation presentation.
import asyncio
import os
from typing import Optional
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TherapeuticContentDiscoverySystem:
"""
Complete system integration for therapeutic content discovery chatbot.
"""
def __init__(self, openai_api_key: str, youtube_api_key: str):
self.chatbot = TherapeuticContentChatbot(openai_api_key, youtube_api_key)
self.active_sessions = {}
async def start_conversation(self, user_id: str) -> str:
"""
Initiates a new conversation with the user.
"""
welcome_message = await self.chatbot.process_user_message(
user_id,
"Hello" # Initial greeting trigger
)
return welcome_message
async def process_user_input(self, user_id: str, user_message: str) -> str:
"""
Processes user input and returns appropriate response.
"""
try:
response = await self.chatbot.process_user_message(user_id, user_message)
return response
except Exception as e:
logger.error(f"Error processing user input: {e}")
return "I apologize for the technical difficulty. Please try again or rephrase your request."
async def get_conversation_history(self, user_id: str) -> List[Dict[str, str]]:
"""
Retrieves conversation history for the specified user.
"""
context = self.chatbot.conversation_contexts.get(user_id)
if context:
return context.conversation_history
return []
def get_active_sessions_count(self) -> int:
"""
Returns the number of currently active conversation sessions.
"""
return len(self.chatbot.conversation_contexts)
async def demonstrate_system_functionality():
"""
Comprehensive demonstration of the therapeutic content discovery system.
"""
# Initialize system with API keys (in production, these would come from environment variables)
openai_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here")
youtube_key = os.getenv("YOUTUBE_API_KEY", "your_youtube_api_key_here")
system = TherapeuticContentDiscoverySystem(openai_key, youtube_key)
# Simulate user interactions
user_id = "demo_user_001"
print("=== Therapeutic Content Discovery System Demo ===\n")
# Start conversation
print("System: Starting conversation...")
welcome_response = await system.start_conversation(user_id)
print(f"Bot: {welcome_response}\n")
# Simulate user expressing anxiety and sleep issues
user_input_1 = "I'm feeling really anxious about work lately and I'm having trouble sleeping. I need something to help me relax and get better rest."
print(f"User: {user_input_1}")
response_1 = await system.process_user_input(user_id, user_input_1)
print(f"Bot: {response_1}\n")
# Simulate user providing clarification if needed
user_input_2 = "I'd say my anxiety level is about 7 out of 10, and I'm a beginner with meditation. I prefer sessions around 20 minutes."
print(f"User: {user_input_2}")
response_2 = await system.process_user_input(user_id, user_input_2)
print(f"Bot: {response_2}\n")
# Simulate user requesting more recommendations
user_input_3 = "These look good, but could you show me some shorter options? Maybe around 10-15 minutes?"
print(f"User: {user_input_3}")
response_3 = await system.process_user_input(user_id, user_input_3)
print(f"Bot: {response_3}\n")
# Simulate user expressing satisfaction
user_input_4 = "Perfect! Thank you so much. The second recommendation looks exactly what I need."
print(f"User: {user_input_4}")
response_4 = await system.process_user_input(user_id, user_input_4)
print(f"Bot: {response_4}\n")
# Display conversation statistics
history = await system.get_conversation_history(user_id)
print(f"Conversation completed with {len(history)} total exchanges.")
print(f"Active sessions: {system.get_active_sessions_count()}")
class SystemConfiguration:
"""
Configuration management for the therapeutic content discovery system.
"""
def __init__(self):
self.openai_api_key = os.getenv("OPENAI_API_KEY")
self.youtube_api_key = os.getenv("YOUTUBE_API_KEY")
self.database_url = os.getenv("DATABASE_URL")
self.redis_url = os.getenv("REDIS_URL")
self.log_level = os.getenv("LOG_LEVEL", "INFO")
# Validate required configuration
self._validate_configuration()
def _validate_configuration(self):
"""
Validates that all required configuration parameters are present.
"""
required_keys = ["openai_api_key", "youtube_api_key"]
missing_keys = [key for key in required_keys if not getattr(self, key)]
if missing_keys:
raise ValueError(f"Missing required configuration: {', '.join(missing_keys)}")
def get_database_config(self) -> Optional[Dict[str, str]]:
"""
Returns database configuration if available.
"""
if self.database_url:
return {"url": self.database_url}
return None
def get_cache_config(self) -> Optional[Dict[str, str]]:
"""
Returns cache configuration if available.
"""
if self.redis_url:
return {"url": self.redis_url}
return None
async def production_system_startup():
"""
Production-ready system startup with proper configuration and error handling.
"""
try:
# Load configuration
config = SystemConfiguration()
# Initialize system
system = TherapeuticContentDiscoverySystem(
config.openai_api_key,
config.youtube_api_key
)
logger.info("Therapeutic Content Discovery System started successfully")
# In a production environment, this would integrate with a web framework
# such as FastAPI, Flask, or Django to handle HTTP requests
return system
except Exception as e:
logger.error(f"Failed to start system: {e}")
raise
# Health check and monitoring functions
async def system_health_check(system: TherapeuticContentDiscoverySystem) -> Dict[str, Any]:
"""
Performs system health check and returns status information.
"""
health_status = {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"active_sessions": system.get_active_sessions_count(),
"components": {}
}
# Check OpenAI API connectivity
try:
test_response = await system.chatbot.intent_analyzer._call_llm(
"You are a test assistant.",
"Respond with 'OK' if you can process this message."
)
health_status["components"]["openai"] = "healthy" if "OK" in test_response else "degraded"
except Exception as e:
health_status["components"]["openai"] = "unhealthy"
health_status["status"] = "degraded"
# Check YouTube API connectivity
try:
async with system.chatbot.content_discovery as discovery:
# Perform a minimal test search
test_results = await discovery._execute_search(
SearchQuery(
keywords=["meditation"],
duration_range=(300, 1800),
upload_date_range=(datetime.now() - timedelta(days=30), datetime.now()),
minimum_rating=3.0,
content_type="meditation",
experience_level="beginner"
),
max_results=1
)
health_status["components"]["youtube"] = "healthy"
except Exception as e:
health_status["components"]["youtube"] = "unhealthy"
health_status["status"] = "degraded"
return health_status
if __name__ == "__main__":
# Run the demonstration
asyncio.run(demonstrate_system_functionality())
This complete implementation demonstrates the sophisticated integration of natural language processing, content discovery, machine learning-based recommendation algorithms, and conversational interface management. The system provides a seamless user experience while maintaining high standards for content quality and relevance.
The architecture supports scalability through asynchronous processing, proper error handling, and modular component design. The system can be easily extended to support additional content platforms, enhanced personalization features, and advanced analytics capabilities.
The therapeutic content discovery chatbot represents a significant advancement in digital wellness technology, bridging the gap between user emotional needs and available therapeutic resources through intelligent automation and personalized recommendation algorithms. The system's ability to understand nuanced emotional states and match them with appropriate content creates substantial value for users seeking mental health and wellness support.
No comments:
Post a Comment