Saturday, December 27, 2025

INTELLIGENT MEDITATION AND HYPNOSIS CONTENT DISCOVERY: AN LLM-POWERED THERAPEUTIC CHATBOT SYSTEM



Introduction and System Overview


The modern digital wellness landscape presents users with an overwhelming abundance of meditation and hypnosis content across various platforms. While platforms like YouTube and YouTube Music host thousands of therapeutic audio sessions, users often struggle to find content that precisely matches their current emotional state and specific wellness goals. This challenge has created an opportunity for intelligent content discovery systems that can bridge the gap between user needs and available therapeutic resources.

This article presents a comprehensive LLM-based chatbot system designed to intelligently discover, evaluate, and recommend meditation and hypnosis sessions from multiple platforms. The system leverages advanced natural language processing to understand user emotional states and therapeutic goals, then employs sophisticated search and ranking algorithms to identify the most suitable content from vast digital libraries.

The chatbot operates through a conversational interface where users describe their current mood and desired outcomes using natural language. For instance, a user might express "I'm feeling anxious about work and need help sleeping better tonight" or "I want to build confidence for an upcoming presentation." The system then processes this input to extract key emotional indicators and therapeutic objectives, subsequently searching across multiple platforms to identify relevant content.


Core System Architecture and Components


The system architecture follows clean architecture principles with distinct separation of concerns across multiple layers. The presentation layer handles user interactions through a conversational interface, while the business logic layer processes natural language inputs and manages recommendation algorithms. The data access layer interfaces with external APIs and manages local content caching.

The central component is the Intent Analysis Engine, which utilizes large language models to parse user inputs and extract structured information about emotional states and therapeutic goals. This engine employs prompt engineering techniques to ensure consistent and accurate interpretation of user needs, even when expressed in colloquial or ambiguous language.


import openai

import json

import re

from typing import Dict, List, Tuple, Optional

from dataclasses import dataclass

from enum import Enum

import asyncio

import aiohttp

from datetime import datetime, timedelta


class MoodCategory(Enum):

    ANXIETY = "anxiety"

    DEPRESSION = "depression"

    STRESS = "stress"

    HAPPINESS = "happiness"

    ANGER = "anger"

    SADNESS = "sadness"

    NEUTRAL = "neutral"

    EXCITEMENT = "excitement"


class TherapeuticGoal(Enum):

    SLEEP_IMPROVEMENT = "sleep_improvement"

    STRESS_REDUCTION = "stress_reduction"

    CONFIDENCE_BUILDING = "confidence_building"

    SMOKING_CESSATION = "smoking_cessation"

    WEIGHT_MANAGEMENT = "weight_management"

    PAIN_MANAGEMENT = "pain_management"

    RELATIONSHIP_IMPROVEMENT = "relationship_improvement"

    FOCUS_ENHANCEMENT = "focus_enhancement"

    EMOTIONAL_HEALING = "emotional_healing"

    SPIRITUAL_GROWTH = "spiritual_growth"


@dataclass

class UserIntent:

    primary_mood: MoodCategory

    secondary_moods: List[MoodCategory]

    therapeutic_goals: List[TherapeuticGoal]

    intensity_level: int  # 1-10 scale

    session_duration_preference: Optional[int]  # minutes

    experience_level: str  # beginner, intermediate, advanced

    specific_techniques: List[str]

    raw_input: str


class IntentAnalysisEngine:

    def __init__(self, openai_api_key: str):

        self.client = openai.OpenAI(api_key=openai_api_key)

        self.mood_keywords = self._initialize_mood_keywords()

        self.goal_keywords = self._initialize_goal_keywords()

    

    def _initialize_mood_keywords(self) -> Dict[MoodCategory, List[str]]:

        return {

            MoodCategory.ANXIETY: [

                "anxious", "worried", "nervous", "stressed", "overwhelmed",

                "panic", "fearful", "restless", "tense", "uneasy"

            ],

            MoodCategory.DEPRESSION: [

                "depressed", "sad", "down", "hopeless", "empty", "numb",

                "worthless", "tired", "unmotivated", "lonely"

            ],

            MoodCategory.STRESS: [

                "stressed", "pressure", "overwhelmed", "burned out",

                "exhausted", "frazzled", "overloaded", "strained"

            ],

            MoodCategory.HAPPINESS: [

                "happy", "joyful", "content", "pleased", "cheerful",

                "optimistic", "grateful", "satisfied", "peaceful"

            ],

            MoodCategory.ANGER: [

                "angry", "frustrated", "irritated", "mad", "furious",

                "annoyed", "resentful", "hostile", "aggressive"

            ]

        }

    

    def _initialize_goal_keywords(self) -> Dict[TherapeuticGoal, List[str]]:

        return {

            TherapeuticGoal.SLEEP_IMPROVEMENT: [

                "sleep", "insomnia", "rest", "bedtime", "falling asleep",

                "staying asleep", "sleep quality", "nighttime", "tired"

            ],

            TherapeuticGoal.STRESS_REDUCTION: [

                "stress relief", "calm down", "relax", "tension release",

                "pressure relief", "unwind", "decompress"

            ],

            TherapeuticGoal.CONFIDENCE_BUILDING: [

                "confidence", "self-esteem", "self-worth", "courage",

                "assertiveness", "presentation", "public speaking"

            ],

            TherapeuticGoal.SMOKING_CESSATION: [

                "quit smoking", "stop smoking", "cigarettes", "nicotine",

                "smoking habit", "tobacco", "smoking addiction"

            ]

        }


    async def analyze_user_intent(self, user_input: str) -> UserIntent:

        """

        Analyzes user input to extract mood, goals, and preferences using LLM.

        """

        system_prompt = """

        You are an expert therapeutic content analyst. Analyze the user's input to extract:

        1. Primary emotional state/mood

        2. Secondary emotional states (if any)

        3. Therapeutic goals and objectives

        4. Intensity level of emotions (1-10 scale)

        5. Preferred session duration (if mentioned)

        6. Experience level with meditation/hypnosis

        7. Specific techniques mentioned

        

        Return a structured JSON response with extracted information.

        """

        

        user_prompt = f"""

        Analyze this user input for therapeutic content recommendation:

        "{user_input}"

        

        Provide analysis in this JSON format:

        {{

            "primary_mood": "mood_category",

            "secondary_moods": ["mood1", "mood2"],

            "therapeutic_goals": ["goal1", "goal2"],

            "intensity_level": 5,

            "session_duration_preference": 20,

            "experience_level": "beginner",

            "specific_techniques": ["technique1", "technique2"],

            "confidence_score": 0.85

        }}

        """

        

        try:

            response = await self._call_llm(system_prompt, user_prompt)

            parsed_response = json.loads(response)

            

            return UserIntent(

                primary_mood=MoodCategory(parsed_response.get("primary_mood", "neutral")),

                secondary_moods=[MoodCategory(mood) for mood in parsed_response.get("secondary_moods", [])],

                therapeutic_goals=[TherapeuticGoal(goal) for goal in parsed_response.get("therapeutic_goals", [])],

                intensity_level=parsed_response.get("intensity_level", 5),

                session_duration_preference=parsed_response.get("session_duration_preference"),

                experience_level=parsed_response.get("experience_level", "beginner"),

                specific_techniques=parsed_response.get("specific_techniques", []),

                raw_input=user_input

            )

            

        except Exception as e:

            # Fallback to keyword-based analysis

            return await self._fallback_keyword_analysis(user_input)

    

    async def _call_llm(self, system_prompt: str, user_prompt: str) -> str:

        """

        Makes API call to OpenAI GPT model for intent analysis.

        """

        response = await self.client.chat.completions.create(

            model="gpt-4",

            messages=[

                {"role": "system", "content": system_prompt},

                {"role": "user", "content": user_prompt}

            ],

            temperature=0.3,

            max_tokens=500

        )

        return response.choices[0].message.content

    

    async def _fallback_keyword_analysis(self, user_input: str) -> UserIntent:

        """

        Fallback method using keyword matching when LLM analysis fails.

        """

        user_input_lower = user_input.lower()

        detected_moods = []

        detected_goals = []

        

        # Analyze moods using keyword matching

        for mood, keywords in self.mood_keywords.items():

            if any(keyword in user_input_lower for keyword in keywords):

                detected_moods.append(mood)

        

        # Analyze goals using keyword matching

        for goal, keywords in self.goal_keywords.items():

            if any(keyword in user_input_lower for keyword in keywords):

                detected_goals.append(goal)

        

        # Determine intensity based on intensity words

        intensity_words = {

            "extremely": 9, "very": 8, "really": 7, "quite": 6,

            "somewhat": 4, "slightly": 3, "a little": 2, "barely": 1

        }

        intensity = 5  # default

        for word, level in intensity_words.items():

            if word in user_input_lower:

                intensity = level

                break

        

        return UserIntent(

            primary_mood=detected_moods[0] if detected_moods else MoodCategory.NEUTRAL,

            secondary_moods=detected_moods[1:],

            therapeutic_goals=detected_goals,

            intensity_level=intensity,

            session_duration_preference=None,

            experience_level="beginner",

            specific_techniques=[],

            raw_input=user_input

        )


The Intent Analysis Engine represents the cognitive core of the system, responsible for transforming natural language expressions into structured data that can drive content discovery algorithms. This component employs a hybrid approach, primarily relying on large language models for sophisticated natural language understanding while maintaining keyword-based fallback mechanisms for reliability and cost optimization.

The engine processes user inputs through carefully crafted prompts that guide the LLM to extract specific categories of information. The system recognizes various mood categories ranging from common emotional states like anxiety and happiness to more complex psychological conditions. Similarly, it identifies therapeutic goals that span from immediate needs like sleep improvement to long-term objectives such as smoking cessation or confidence building.


Content Discovery and Platform Integration


The content discovery system interfaces with multiple platforms through their respective APIs, with YouTube Data API v3 serving as the primary integration point. This component handles the complex task of translating user intents into platform-specific search queries, managing API rate limits, and aggregating results across different content sources.


@dataclass

class ContentItem:

    title: str

    url: str

    platform: str

    duration_seconds: int

    view_count: int

    rating: float

    upload_date: datetime

    channel_name: str

    description: str

    thumbnail_url: str

    tags: List[str]

    category: str

    language: str


@dataclass

class SearchQuery:

    keywords: List[str]

    duration_range: Tuple[int, int]  # min, max seconds

    upload_date_range: Tuple[datetime, datetime]

    minimum_rating: float

    content_type: str  # "meditation", "hypnosis", "guided_imagery"

    experience_level: str


class YouTubeContentDiscovery:

    def __init__(self, api_key: str):

        self.api_key = api_key

        self.base_url = "https://www.googleapis.com/youtube/v3"

        self.session = None

        self.rate_limiter = RateLimiter(calls_per_minute=100)

    

    async def __aenter__(self):

        self.session = aiohttp.ClientSession()

        return self

    

    async def __aexit__(self, exc_type, exc_val, exc_tb):

        if self.session:

            await self.session.close()

    

    async def search_content(self, user_intent: UserIntent, max_results: int = 50) -> List[ContentItem]:

        """

        Searches YouTube for meditation and hypnosis content based on user intent.

        """

        search_queries = self._generate_search_queries(user_intent)

        all_results = []

        

        for query in search_queries:

            try:

                await self.rate_limiter.wait_if_needed()

                results = await self._execute_search(query, max_results // len(search_queries))

                all_results.extend(results)

            except Exception as e:

                print(f"Search query failed: {e}")

                continue

        

        # Remove duplicates and sort by relevance

        unique_results = self._deduplicate_results(all_results)

        return await self._rank_and_filter_results(unique_results, user_intent)

    

    def _generate_search_queries(self, user_intent: UserIntent) -> List[SearchQuery]:

        """

        Generates multiple search queries based on user intent to maximize content discovery.

        """

        queries = []

        base_keywords = ["meditation", "hypnosis", "guided meditation", "relaxation"]

        

        # Generate mood-specific queries

        mood_keywords = self._get_mood_keywords(user_intent.primary_mood)

        for mood_keyword in mood_keywords:

            for base_keyword in base_keywords:

                keywords = [base_keyword, mood_keyword]

                

                # Add goal-specific keywords

                for goal in user_intent.therapeutic_goals:

                    goal_keywords = self._get_goal_keywords(goal)

                    keywords.extend(goal_keywords[:2])  # Limit to avoid overly specific queries

                

                # Determine duration range based on user preference

                duration_range = self._calculate_duration_range(user_intent)

                

                queries.append(SearchQuery(

                    keywords=keywords,

                    duration_range=duration_range,

                    upload_date_range=(datetime.now() - timedelta(days=365*3), datetime.now()),

                    minimum_rating=3.5,

                    content_type="meditation" if "meditation" in base_keyword else "hypnosis",

                    experience_level=user_intent.experience_level

                ))

        

        return queries[:10]  # Limit number of queries to manage API usage

    

    def _get_mood_keywords(self, mood: MoodCategory) -> List[str]:

        """

        Returns search keywords associated with specific mood categories.

        """

        mood_keyword_map = {

            MoodCategory.ANXIETY: ["anxiety relief", "calm anxiety", "stress relief", "peaceful"],

            MoodCategory.DEPRESSION: ["depression help", "mood boost", "emotional healing", "hope"],

            MoodCategory.STRESS: ["stress relief", "tension release", "calm mind", "relaxation"],

            MoodCategory.HAPPINESS: ["joy meditation", "gratitude", "positive energy", "happiness"],

            MoodCategory.ANGER: ["anger management", "calm anger", "emotional balance", "peace"],

            MoodCategory.SADNESS: ["healing sadness", "emotional support", "comfort", "uplift"],

            MoodCategory.NEUTRAL: ["general meditation", "mindfulness", "awareness", "presence"]

        }

        return mood_keyword_map.get(mood, ["mindfulness", "meditation"])

    

    def _get_goal_keywords(self, goal: TherapeuticGoal) -> List[str]:

        """

        Returns search keywords associated with specific therapeutic goals.

        """

        goal_keyword_map = {

            TherapeuticGoal.SLEEP_IMPROVEMENT: ["sleep meditation", "bedtime", "insomnia", "deep sleep"],

            TherapeuticGoal.STRESS_REDUCTION: ["stress relief", "relaxation", "calm", "tension release"],

            TherapeuticGoal.CONFIDENCE_BUILDING: ["confidence", "self-esteem", "courage", "empowerment"],

            TherapeuticGoal.SMOKING_CESSATION: ["quit smoking", "stop smoking", "addiction", "freedom"],

            TherapeuticGoal.WEIGHT_MANAGEMENT: ["weight loss", "healthy eating", "body image", "metabolism"],

            TherapeuticGoal.PAIN_MANAGEMENT: ["pain relief", "healing", "chronic pain", "comfort"],

            TherapeuticGoal.RELATIONSHIP_IMPROVEMENT: ["love", "relationships", "communication", "connection"],

            TherapeuticGoal.FOCUS_ENHANCEMENT: ["focus", "concentration", "attention", "clarity"],

            TherapeuticGoal.EMOTIONAL_HEALING: ["healing", "trauma", "emotional release", "recovery"],

            TherapeuticGoal.SPIRITUAL_GROWTH: ["spiritual", "awakening", "consciousness", "enlightenment"]

        }

        return goal_keyword_map.get(goal, ["wellness", "healing"])

    

    def _calculate_duration_range(self, user_intent: UserIntent) -> Tuple[int, int]:

        """

        Calculates appropriate duration range based on user preferences and experience level.

        """

        if user_intent.session_duration_preference:

            target_duration = user_intent.session_duration_preference * 60  # convert to seconds

            return (target_duration - 300, target_duration + 300)  # ±5 minutes

        

        # Default ranges based on experience level

        duration_ranges = {

            "beginner": (300, 1200),    # 5-20 minutes

            "intermediate": (600, 2400), # 10-40 minutes

            "advanced": (1200, 3600)     # 20-60 minutes

        }

        return duration_ranges.get(user_intent.experience_level, (300, 1800))

    

    async def _execute_search(self, query: SearchQuery, max_results: int) -> List[ContentItem]:

        """

        Executes a single search query against YouTube API.

        """

        search_params = {

            "part": "snippet",

            "q": " ".join(query.keywords),

            "type": "video",

            "maxResults": min(max_results, 50),

            "order": "relevance",

            "videoDuration": self._get_duration_filter(query.duration_range),

            "key": self.api_key

        }

        

        async with self.session.get(f"{self.base_url}/search", params=search_params) as response:

            if response.status != 200:

                raise Exception(f"YouTube API error: {response.status}")

            

            data = await response.json()

            video_ids = [item["id"]["videoId"] for item in data.get("items", [])]

            

            if not video_ids:

                return []

            

            # Get detailed video information

            return await self._get_video_details(video_ids)

    

    def _get_duration_filter(self, duration_range: Tuple[int, int]) -> str:

        """

        Converts duration range to YouTube API duration filter.

        """

        min_duration, max_duration = duration_range

        

        if max_duration <= 240:  # 4 minutes

            return "short"

        elif max_duration <= 1200:  # 20 minutes

            return "medium"

        else:

            return "long"

    

    async def _get_video_details(self, video_ids: List[str]) -> List[ContentItem]:

        """

        Retrieves detailed information for specific video IDs.

        """

        details_params = {

            "part": "snippet,contentDetails,statistics",

            "id": ",".join(video_ids),

            "key": self.api_key

        }

        

        async with self.session.get(f"{self.base_url}/videos", params=details_params) as response:

            if response.status != 200:

                return []

            

            data = await response.json()

            content_items = []

            

            for item in data.get("items", []):

                try:

                    content_item = self._parse_video_item(item)

                    if content_item:

                        content_items.append(content_item)

                except Exception as e:

                    print(f"Error parsing video item: {e}")

                    continue

            

            return content_items

    

    def _parse_video_item(self, item: dict) -> Optional[ContentItem]:

        """

        Parses YouTube API response item into ContentItem object.

        """

        try:

            snippet = item["snippet"]

            statistics = item.get("statistics", {})

            content_details = item["contentDetails"]

            

            # Parse duration from ISO 8601 format

            duration_str = content_details["duration"]

            duration_seconds = self._parse_iso_duration(duration_str)

            

            # Calculate rating from like/dislike ratio

            like_count = int(statistics.get("likeCount", 0))

            view_count = int(statistics.get("viewCount", 0))

            rating = min(5.0, (like_count / max(view_count, 1)) * 100) if view_count > 0 else 3.0

            

            return ContentItem(

                title=snippet["title"],

                url=f"https://www.youtube.com/watch?v={item['id']}",

                platform="youtube",

                duration_seconds=duration_seconds,

                view_count=view_count,

                rating=rating,

                upload_date=datetime.fromisoformat(snippet["publishedAt"].replace("Z", "+00:00")),

                channel_name=snippet["channelTitle"],

                description=snippet.get("description", ""),

                thumbnail_url=snippet["thumbnails"]["high"]["url"],

                tags=snippet.get("tags", []),

                category=snippet.get("categoryId", ""),

                language=snippet.get("defaultLanguage", "en")

            )

            

        except (KeyError, ValueError) as e:

            print(f"Error parsing video item: {e}")

            return None

    

    def _parse_iso_duration(self, duration_str: str) -> int:

        """

        Parses ISO 8601 duration format (PT15M33S) to seconds.

        """

        import re

        pattern = r'PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?'

        match = re.match(pattern, duration_str)

        

        if not match:

            return 0

        

        hours = int(match.group(1) or 0)

        minutes = int(match.group(2) or 0)

        seconds = int(match.group(3) or 0)

        

        return hours * 3600 + minutes * 60 + seconds

    

    def _deduplicate_results(self, results: List[ContentItem]) -> List[ContentItem]:

        """

        Removes duplicate content items based on title similarity and URL.

        """

        seen_urls = set()

        unique_results = []

        

        for item in results:

            if item.url not in seen_urls:

                seen_urls.add(item.url)

                unique_results.append(item)

        

        return unique_results

    

    async def _rank_and_filter_results(self, results: List[ContentItem], user_intent: UserIntent) -> List[ContentItem]:

        """

        Ranks and filters results based on relevance to user intent.

        """

        scored_results = []

        

        for item in results:

            relevance_score = await self._calculate_relevance_score(item, user_intent)

            if relevance_score > 0.3:  # Minimum relevance threshold

                scored_results.append((item, relevance_score))

        

        # Sort by relevance score (descending)

        scored_results.sort(key=lambda x: x[1], reverse=True)

        

        return [item for item, score in scored_results]

    

    async def _calculate_relevance_score(self, item: ContentItem, user_intent: UserIntent) -> float:

        """

        Calculates relevance score for a content item based on user intent.

        """

        score = 0.0

        

        # Title and description keyword matching

        text_content = f"{item.title} {item.description}".lower()

        

        # Mood keyword matching

        mood_keywords = self._get_mood_keywords(user_intent.primary_mood)

        mood_matches = sum(1 for keyword in mood_keywords if keyword.lower() in text_content)

        score += (mood_matches / len(mood_keywords)) * 0.3

        

        # Goal keyword matching

        for goal in user_intent.therapeutic_goals:

            goal_keywords = self._get_goal_keywords(goal)

            goal_matches = sum(1 for keyword in goal_keywords if keyword.lower() in text_content)

            score += (goal_matches / len(goal_keywords)) * 0.2

        

        # Duration preference matching

        if user_intent.session_duration_preference:

            target_duration = user_intent.session_duration_preference * 60

            duration_diff = abs(item.duration_seconds - target_duration)

            duration_score = max(0, 1 - (duration_diff / target_duration))

            score += duration_score * 0.2

        

        # Quality indicators

        if item.view_count > 1000:

            score += 0.1

        if item.rating > 4.0:

            score += 0.1

        

        # Channel reputation (simplified)

        trusted_channels = ["jason stephenson", "michael sealey", "the honest guys", "michelle's sanctuary"]

        if any(channel in item.channel_name.lower() for channel in trusted_channels):

            score += 0.1

        

        return min(1.0, score)



class RateLimiter:

    def __init__(self, calls_per_minute: int):

        self.calls_per_minute = calls_per_minute

        self.calls = []

    

    async def wait_if_needed(self):

        now = datetime.now()

        # Remove calls older than 1 minute

        self.calls = [call_time for call_time in self.calls if (now - call_time).seconds < 60]

        

        if len(self.calls) >= self.calls_per_minute:

            sleep_time = 60 - (now - self.calls[0]).seconds

            await asyncio.sleep(sleep_time)

        

        self.calls.append(now)


The content discovery system represents a sophisticated search and retrieval mechanism that transforms user intents into targeted content queries across multiple platforms. The system employs a multi-query strategy, generating several complementary search queries to maximize content discovery while avoiding overly narrow results that might miss relevant content.

The YouTube integration component handles the complexities of the YouTube Data API, including rate limiting, pagination, and data parsing. The system constructs search queries by combining mood-specific keywords with therapeutic goal indicators, creating comprehensive search terms that capture the nuanced requirements expressed in user inputs.

Quality assessment plays a crucial role in content filtering, with the system evaluating multiple factors including view counts, engagement metrics, channel reputation, and content freshness. This multi-dimensional scoring approach ensures that recommended content meets both relevance and quality standards.


Recommendation Engine and Content Ranking


The recommendation engine serves as the intelligent core that transforms discovered content into personalized suggestions. This component employs machine learning techniques and heuristic algorithms to score and rank content based on multiple relevance factors, ensuring that the final recommendations closely align with user needs and preferences.


from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.metrics.pairwise import cosine_similarity

import numpy as np

from typing import Dict, List, Tuple

import pickle

import os


@dataclass

class RecommendationScore:

    content_item: ContentItem

    overall_score: float

    mood_relevance: float

    goal_relevance: float

    quality_score: float

    personalization_score: float

    explanation: str


class ContentRecommendationEngine:

    def __init__(self):

        self.tfidf_vectorizer = TfidfVectorizer(

            max_features=1000,

            stop_words='english',

            ngram_range=(1, 2)

        )

        self.content_embeddings = {}

        self.user_preference_model = None

        self.content_quality_model = None

        self._initialize_models()

    

    def _initialize_models(self):

        """

        Initializes machine learning models for content analysis and user preference modeling.

        """

        # Load pre-trained models if available

        if os.path.exists("models/user_preference_model.pkl"):

            with open("models/user_preference_model.pkl", "rb") as f:

                self.user_preference_model = pickle.load(f)

        

        if os.path.exists("models/content_quality_model.pkl"):

            with open("models/content_quality_model.pkl", "rb") as f:

                self.content_quality_model = pickle.load(f)

    

    async def generate_recommendations(

        self, 

        content_items: List[ContentItem], 

        user_intent: UserIntent,

        user_history: Optional[List[ContentItem]] = None,

        num_recommendations: int = 10

    ) -> List[RecommendationScore]:

        """

        Generates personalized content recommendations based on user intent and content analysis.

        """

        if not content_items:

            return []

        

        # Calculate various scoring components

        mood_scores = await self._calculate_mood_relevance_scores(content_items, user_intent)

        goal_scores = await self._calculate_goal_relevance_scores(content_items, user_intent)

        quality_scores = await self._calculate_quality_scores(content_items)

        personalization_scores = await self._calculate_personalization_scores(

            content_items, user_intent, user_history

        )

        

        # Combine scores with weighted importance

        recommendation_scores = []

        for i, item in enumerate(content_items):

            overall_score = self._combine_scores(

                mood_scores[i],

                goal_scores[i],

                quality_scores[i],

                personalization_scores[i],

                user_intent

            )

            

            explanation = self._generate_explanation(

                item, mood_scores[i], goal_scores[i], 

                quality_scores[i], personalization_scores[i]

            )

            

            recommendation_scores.append(RecommendationScore(

                content_item=item,

                overall_score=overall_score,

                mood_relevance=mood_scores[i],

                goal_relevance=goal_scores[i],

                quality_score=quality_scores[i],

                personalization_score=personalization_scores[i],

                explanation=explanation

            ))

        

        # Sort by overall score and return top recommendations

        recommendation_scores.sort(key=lambda x: x.overall_score, reverse=True)

        return recommendation_scores[:num_recommendations]

    

    async def _calculate_mood_relevance_scores(

        self, 

        content_items: List[ContentItem], 

        user_intent: UserIntent

    ) -> List[float]:

        """

        Calculates mood relevance scores using semantic similarity analysis.

        """

        mood_descriptors = self._get_mood_descriptors(user_intent.primary_mood)

        secondary_mood_descriptors = []

        for mood in user_intent.secondary_moods:

            secondary_mood_descriptors.extend(self._get_mood_descriptors(mood))

        

        all_mood_text = " ".join(mood_descriptors + secondary_mood_descriptors)

        

        scores = []

        for item in content_items:

            content_text = f"{item.title} {item.description}"

            

            # Use TF-IDF similarity for semantic matching

            combined_texts = [all_mood_text, content_text]

            tfidf_matrix = self.tfidf_vectorizer.fit_transform(combined_texts)

            similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2])[0][0]

            

            # Adjust score based on mood intensity

            intensity_multiplier = user_intent.intensity_level / 10.0

            adjusted_score = similarity * (0.5 + 0.5 * intensity_multiplier)

            

            scores.append(min(1.0, adjusted_score))

        

        return scores

    

    def _get_mood_descriptors(self, mood: MoodCategory) -> List[str]:

        """

        Returns detailed descriptive text for mood categories to improve semantic matching.

        """

        mood_descriptors = {

            MoodCategory.ANXIETY: [

                "anxious worried nervous tense restless fearful panic overwhelmed",

                "stress tension pressure unease discomfort agitation nervousness",

                "calm peaceful relaxed serene tranquil soothing gentle"

            ],

            MoodCategory.DEPRESSION: [

                "depressed sad down hopeless empty numb worthless lonely",

                "melancholy despair grief sorrow dejection despondency",

                "uplifting hopeful encouraging positive healing comfort"

            ],

            MoodCategory.STRESS: [

                "stressed overwhelmed pressure burned exhausted frazzled",

                "tension strain burden overload fatigue weariness",

                "relaxation relief calm peace restoration rejuvenation"

            ],

            MoodCategory.HAPPINESS: [

                "happy joyful content pleased cheerful optimistic grateful",

                "bliss delight satisfaction fulfillment elation euphoria",

                "maintain enhance amplify celebrate appreciate gratitude"

            ],

            MoodCategory.ANGER: [

                "angry frustrated irritated mad furious annoyed resentful",

                "rage hostility aggression indignation wrath fury",

                "calm peaceful forgiveness understanding patience tolerance"

            ]

        }

        return mood_descriptors.get(mood, ["neutral balanced centered"])

    

    async def _calculate_goal_relevance_scores(

        self, 

        content_items: List[ContentItem], 

        user_intent: UserIntent

    ) -> List[float]:

        """

        Calculates therapeutic goal relevance scores using specialized keyword analysis.

        """

        goal_keywords = {}

        for goal in user_intent.therapeutic_goals:

            goal_keywords[goal] = self._get_detailed_goal_keywords(goal)

        

        scores = []

        for item in content_items:

            content_text = f"{item.title} {item.description} {' '.join(item.tags)}".lower()

            

            goal_score = 0.0

            for goal, keywords in goal_keywords.items():

                keyword_matches = sum(1 for keyword in keywords if keyword in content_text)

                goal_relevance = keyword_matches / len(keywords)

                goal_score += goal_relevance

            

            # Normalize by number of goals

            if user_intent.therapeutic_goals:

                goal_score /= len(user_intent.therapeutic_goals)

            

            scores.append(min(1.0, goal_score))

        

        return scores

    

    def _get_detailed_goal_keywords(self, goal: TherapeuticGoal) -> List[str]:

        """

        Returns comprehensive keyword lists for therapeutic goals.

        """

        detailed_keywords = {

            TherapeuticGoal.SLEEP_IMPROVEMENT: [

                "sleep", "insomnia", "bedtime", "falling asleep", "staying asleep",

                "deep sleep", "rem sleep", "sleep cycle", "nighttime", "rest",

                "slumber", "drowsy", "sleepy", "tired", "fatigue", "exhaustion"

            ],

            TherapeuticGoal.STRESS_REDUCTION: [

                "stress", "tension", "pressure", "overwhelm", "anxiety",

                "relaxation", "calm", "peace", "tranquil", "serene",

                "unwind", "decompress", "release", "relief", "soothing"

            ],

            TherapeuticGoal.CONFIDENCE_BUILDING: [

                "confidence", "self-esteem", "self-worth", "courage", "brave",

                "assertiveness", "empowerment", "strength", "bold", "fearless",

                "self-belief", "inner strength", "personal power", "leadership"

            ],

            TherapeuticGoal.SMOKING_CESSATION: [

                "smoking", "cigarettes", "tobacco", "nicotine", "addiction",

                "quit", "stop", "cessation", "freedom", "liberation",

                "healthy lungs", "clean air", "smoke-free", "withdrawal"

            ]

        }

        return detailed_keywords.get(goal, [])

    

    async def _calculate_quality_scores(self, content_items: List[ContentItem]) -> List[float]:

        """

        Calculates content quality scores based on multiple quality indicators.

        """

        scores = []

        

        # Calculate percentiles for normalization

        view_counts = [item.view_count for item in content_items]

        ratings = [item.rating for item in content_items]

        

        view_count_percentiles = np.percentile(view_counts, [25, 50, 75, 90])

        rating_percentiles = np.percentile(ratings, [25, 50, 75, 90])

        

        for item in content_items:

            quality_score = 0.0

            

            # View count score (30% weight)

            view_score = self._calculate_percentile_score(item.view_count, view_count_percentiles)

            quality_score += view_score * 0.3

            

            # Rating score (25% weight)

            rating_score = self._calculate_percentile_score(item.rating, rating_percentiles)

            quality_score += rating_score * 0.25

            

            # Channel reputation score (20% weight)

            channel_score = self._calculate_channel_reputation_score(item.channel_name)

            quality_score += channel_score * 0.2

            

            # Content freshness score (15% weight)

            freshness_score = self._calculate_freshness_score(item.upload_date)

            quality_score += freshness_score * 0.15

            

            # Duration appropriateness score (10% weight)

            duration_score = self._calculate_duration_appropriateness_score(item.duration_seconds)

            quality_score += duration_score * 0.1

            

            scores.append(min(1.0, quality_score))

        

        return scores

    

    def _calculate_percentile_score(self, value: float, percentiles: List[float]) -> float:

        """

        Converts a value to a percentile-based score.

        """

        if value >= percentiles[3]:  # 90th percentile

            return 1.0

        elif value >= percentiles[2]:  # 75th percentile

            return 0.8

        elif value >= percentiles[1]:  # 50th percentile

            return 0.6

        elif value >= percentiles[0]:  # 25th percentile

            return 0.4

        else:

            return 0.2

    

    def _calculate_channel_reputation_score(self, channel_name: str) -> float:

        """

        Calculates channel reputation score based on known quality creators.

        """

        # High-reputation meditation/hypnosis channels

        tier_1_channels = [

            "jason stephenson", "michael sealey", "the honest guys",

            "michelle's sanctuary", "lauren ostrowski fenton", "thomas hall"

        ]

        

        tier_2_channels = [

            "meditation relax music", "yellow brick cinema", "soothing relaxation",

            "new horizon meditation", "great meditation", "meditative mind"

        ]

        

        channel_lower = channel_name.lower()

        

        if any(channel in channel_lower for channel in tier_1_channels):

            return 1.0

        elif any(channel in channel_lower for channel in tier_2_channels):

            return 0.7

        else:

            return 0.5  # Neutral score for unknown channels

    

    def _calculate_freshness_score(self, upload_date: datetime) -> float:

        """

        Calculates content freshness score based on upload date.

        """

        days_old = (datetime.now() - upload_date).days

        

        if days_old <= 30:  # Very fresh

            return 1.0

        elif days_old <= 180:  # Recent

            return 0.8

        elif days_old <= 365:  # Moderately old

            return 0.6

        elif days_old <= 730:  # Old but acceptable

            return 0.4

        else:  # Very old

            return 0.2

    

    def _calculate_duration_appropriateness_score(self, duration_seconds: int) -> float:

        """

        Calculates score based on duration appropriateness for meditation/hypnosis.

        """

        duration_minutes = duration_seconds / 60

        

        # Optimal ranges for different types of sessions

        if 5 <= duration_minutes <= 60:  # Ideal range

            return 1.0

        elif 3 <= duration_minutes < 5 or 60 < duration_minutes <= 90:  # Acceptable

            return 0.7

        elif 1 <= duration_minutes < 3 or 90 < duration_minutes <= 120:  # Marginal

            return 0.4

        else:  # Too short or too long

            return 0.2

    

    async def _calculate_personalization_scores(

        self, 

        content_items: List[ContentItem], 

        user_intent: UserIntent,

        user_history: Optional[List[ContentItem]] = None

    ) -> List[float]:

        """

        Calculates personalization scores based on user preferences and history.

        """

        if not user_history:

            # Return neutral scores if no history available

            return [0.5] * len(content_items)

        

        scores = []

        

        # Analyze user preferences from history

        preferred_channels = self._extract_preferred_channels(user_history)

        preferred_duration_range = self._extract_preferred_duration(user_history)

        preferred_content_types = self._extract_preferred_content_types(user_history)

        

        for item in content_items:

            personalization_score = 0.0

            

            # Channel preference matching

            if item.channel_name in preferred_channels:

                personalization_score += 0.4

            

            # Duration preference matching

            if self._duration_matches_preference(item.duration_seconds, preferred_duration_range):

                personalization_score += 0.3

            

            # Content type preference matching

            content_type = self._classify_content_type(item)

            if content_type in preferred_content_types:

                personalization_score += 0.3

            

            scores.append(min(1.0, personalization_score))

        

        return scores

    

    def _extract_preferred_channels(self, user_history: List[ContentItem]) -> Dict[str, float]:

        """

        Extracts user's preferred channels from viewing history.

        """

        channel_counts = {}

        for item in user_history:

            channel_counts[item.channel_name] = channel_counts.get(item.channel_name, 0) + 1

        

        total_views = len(user_history)

        return {channel: count/total_views for channel, count in channel_counts.items()}

    

    def _extract_preferred_duration(self, user_history: List[ContentItem]) -> Tuple[int, int]:

        """

        Extracts user's preferred session duration range from history.

        """

        durations = [item.duration_seconds for item in user_history]

        avg_duration = sum(durations) / len(durations)

        std_duration = np.std(durations)

        

        return (int(avg_duration - std_duration), int(avg_duration + std_duration))

    

    def _extract_preferred_content_types(self, user_history: List[ContentItem]) -> List[str]:

        """

        Extracts user's preferred content types from history.

        """

        content_types = [self._classify_content_type(item) for item in user_history]

        type_counts = {}

        for content_type in content_types:

            type_counts[content_type] = type_counts.get(content_type, 0) + 1

        

        # Return types that appear in more than 20% of history

        threshold = len(user_history) * 0.2

        return [content_type for content_type, count in type_counts.items() if count >= threshold]

    

    def _classify_content_type(self, item: ContentItem) -> str:

        """

        Classifies content into categories based on title and description.

        """

        text = f"{item.title} {item.description}".lower()

        

        if any(keyword in text for keyword in ["hypnosis", "hypnotic", "trance"]):

            return "hypnosis"

        elif any(keyword in text for keyword in ["guided meditation", "guided"]):

            return "guided_meditation"

        elif any(keyword in text for keyword in ["music", "sounds", "ambient"]):

            return "meditation_music"

        elif any(keyword in text for keyword in ["mindfulness", "awareness"]):

            return "mindfulness"

        elif any(keyword in text for keyword in ["visualization", "imagery"]):

            return "visualization"

        else:

            return "general_meditation"

    

    def _duration_matches_preference(self, duration: int, preferred_range: Tuple[int, int]) -> bool:

        """

        Checks if content duration matches user's preferred range.

        """

        min_duration, max_duration = preferred_range

        return min_duration <= duration <= max_duration

    

    def _combine_scores(

        self, 

        mood_score: float, 

        goal_score: float, 

        quality_score: float, 

        personalization_score: float,

        user_intent: UserIntent

    ) -> float:

        """

        Combines individual scores into overall recommendation score with dynamic weighting.

        """

        # Base weights

        mood_weight = 0.35

        goal_weight = 0.25

        quality_weight = 0.25

        personalization_weight = 0.15

        

        # Adjust weights based on user intent intensity

        if user_intent.intensity_level >= 8:  # High intensity - prioritize mood matching

            mood_weight = 0.45

            goal_weight = 0.30

            quality_weight = 0.15

            personalization_weight = 0.10

        elif user_intent.intensity_level <= 3:  # Low intensity - prioritize quality and personalization

            mood_weight = 0.25

            goal_weight = 0.20

            quality_weight = 0.35

            personalization_weight = 0.20

        

        overall_score = (

            mood_score * mood_weight +

            goal_score * goal_weight +

            quality_score * quality_weight +

            personalization_score * personalization_weight

        )

        

        return min(1.0, overall_score)

    

    def _generate_explanation(

        self, 

        item: ContentItem, 

        mood_score: float, 

        goal_score: float, 

        quality_score: float, 

        personalization_score: float

    ) -> str:

        """

        Generates human-readable explanation for why content was recommended.

        """

        explanations = []

        

        if mood_score >= 0.7:

            explanations.append("excellent match for your current mood")

        elif mood_score >= 0.5:

            explanations.append("good alignment with your emotional state")

        

        if goal_score >= 0.7:

            explanations.append("highly relevant to your therapeutic goals")

        elif goal_score >= 0.5:

            explanations.append("addresses your stated objectives")

        

        if quality_score >= 0.8:

            explanations.append("high-quality content from reputable source")

        elif quality_score >= 0.6:

            explanations.append("well-rated content with good engagement")

        

        if personalization_score >= 0.7:

            explanations.append("matches your personal preferences")

        

        if not explanations:

            explanations.append("balanced recommendation across multiple factors")

        

        return f"Recommended because it offers {', '.join(explanations)}."


The recommendation engine employs a sophisticated multi-factor scoring system that considers mood relevance, therapeutic goal alignment, content quality, and personalization factors. The system uses machine learning techniques including TF-IDF vectorization and cosine similarity to perform semantic matching between user intents and content descriptions.

The engine dynamically adjusts scoring weights based on user intent intensity, prioritizing mood matching for high-intensity emotional states while emphasizing quality and personalization for more general requests. This adaptive approach ensures that recommendations remain relevant across different user contexts and emotional urgency levels.


Conversational Interface and User Experience


The conversational interface serves as the primary touchpoint between users and the system, managing the flow of interaction from initial intent capture through final recommendation presentation. This component handles natural language understanding, context maintenance, and response generation to create a seamless and intuitive user experience.


from enum import Enum

from dataclasses import dataclass

from typing import Dict, List, Optional, Any

import json

import asyncio


class ConversationState(Enum):

    INITIAL_GREETING = "initial_greeting"

    INTENT_GATHERING = "intent_gathering"

    INTENT_CLARIFICATION = "intent_clarification"

    CONTENT_SEARCHING = "content_searching"

    RECOMMENDATION_PRESENTATION = "recommendation_presentation"

    FOLLOW_UP = "follow_up"

    FEEDBACK_COLLECTION = "feedback_collection"


@dataclass

class ConversationContext:

    user_id: str

    session_id: str

    current_state: ConversationState

    user_intent: Optional[UserIntent]

    conversation_history: List[Dict[str, str]]

    recommendations: List[RecommendationScore]

    user_preferences: Dict[str, Any]

    clarification_needed: List[str]

    last_interaction_time: datetime


class TherapeuticContentChatbot:

    def __init__(

        self, 

        openai_api_key: str, 

        youtube_api_key: str,

        database_connection: Optional[Any] = None

    ):

        self.intent_analyzer = IntentAnalysisEngine(openai_api_key)

        self.content_discovery = YouTubeContentDiscovery(youtube_api_key)

        self.recommendation_engine = ContentRecommendationEngine()

        self.conversation_contexts = {}

        self.database = database_connection

        self.response_templates = self._initialize_response_templates()

    

    def _initialize_response_templates(self) -> Dict[str, List[str]]:

        """

        Initializes response templates for different conversation states and scenarios.

        """

        return {

            "greeting": [

                "Hello! I'm here to help you find the perfect meditation or hypnosis session for your current needs. How are you feeling today, and what would you like to work on?",

                "Welcome! I specialize in finding personalized meditation and hypnosis content. Could you tell me about your current mood and what you're hoping to achieve?",

                "Hi there! I'm your therapeutic content assistant. What's on your mind today, and how can I help you find the right session for your needs?"

            ],

            "intent_clarification": [

                "I want to make sure I find the best content for you. Could you tell me a bit more about {clarification_topic}?",

                "To give you the most relevant recommendations, I'd like to understand {clarification_topic} better. Can you elaborate?",

                "Help me personalize your recommendations by sharing more details about {clarification_topic}."

            ],

            "searching": [

                "I'm searching for content that matches your needs. This may take a moment while I explore multiple platforms...",

                "Let me find the perfect sessions for you. I'm checking YouTube and other platforms for relevant content...",

                "Searching for personalized recommendations based on your mood and goals. Please wait a moment..."

            ],

            "no_results": [

                "I couldn't find content that perfectly matches your specific needs. Would you like me to search with broader criteria or try different keywords?",

                "It seems there's limited content for your exact requirements. Shall I expand the search or suggest alternative approaches?",

                "I'm having trouble finding exact matches. Would you like me to recommend similar content or adjust the search parameters?"

            ],

            "recommendations_intro": [

                "I found {count} excellent sessions that match your needs. Here are my top recommendations:",

                "Based on your mood and goals, I've curated {count} personalized recommendations for you:",

                "I've discovered {count} sessions that should be perfect for your current situation:"

            ]

        }

    

    async def process_user_message(self, user_id: str, message: str) -> str:

        """

        Main entry point for processing user messages and generating appropriate responses.

        """

        # Get or create conversation context

        context = self._get_or_create_context(user_id, message)

        

        try:

            # Update conversation history

            context.conversation_history.append({

                "timestamp": datetime.now().isoformat(),

                "role": "user",

                "content": message

            })

            

            # Process message based on current conversation state

            response = await self._process_message_by_state(context, message)

            

            # Update conversation history with bot response

            context.conversation_history.append({

                "timestamp": datetime.now().isoformat(),

                "role": "assistant",

                "content": response

            })

            

            # Save context to database if available

            if self.database:

                await self._save_conversation_context(context)

            

            return response

            

        except Exception as e:

            error_response = "I apologize, but I encountered an issue while processing your request. Could you please try rephrasing your message or let me know if you need assistance with something specific?"

            context.conversation_history.append({

                "timestamp": datetime.now().isoformat(),

                "role": "assistant",

                "content": error_response

            })

            return error_response

    

    def _get_or_create_context(self, user_id: str, message: str) -> ConversationContext:

        """

        Retrieves existing conversation context or creates a new one for the user.

        """

        if user_id not in self.conversation_contexts:

            self.conversation_contexts[user_id] = ConversationContext(

                user_id=user_id,

                session_id=f"{user_id}_{datetime.now().timestamp()}",

                current_state=ConversationState.INITIAL_GREETING,

                user_intent=None,

                conversation_history=[],

                recommendations=[],

                user_preferences={},

                clarification_needed=[],

                last_interaction_time=datetime.now()

            )

        else:

            # Check if session has expired (more than 30 minutes of inactivity)

            context = self.conversation_contexts[user_id]

            time_since_last_interaction = datetime.now() - context.last_interaction_time

            if time_since_last_interaction.total_seconds() > 1800:  # 30 minutes

                # Reset to initial state for new session

                context.current_state = ConversationState.INITIAL_GREETING

                context.user_intent = None

                context.recommendations = []

                context.clarification_needed = []

        

        self.conversation_contexts[user_id].last_interaction_time = datetime.now()

        return self.conversation_contexts[user_id]

    

    async def _process_message_by_state(self, context: ConversationContext, message: str) -> str:

        """

        Routes message processing based on current conversation state.

        """

        if context.current_state == ConversationState.INITIAL_GREETING:

            return await self._handle_initial_greeting(context, message)

        elif context.current_state == ConversationState.INTENT_GATHERING:

            return await self._handle_intent_gathering(context, message)

        elif context.current_state == ConversationState.INTENT_CLARIFICATION:

            return await self._handle_intent_clarification(context, message)

        elif context.current_state == ConversationState.RECOMMENDATION_PRESENTATION:

            return await self._handle_recommendation_interaction(context, message)

        elif context.current_state == ConversationState.FOLLOW_UP:

            return await self._handle_follow_up(context, message)

        elif context.current_state == ConversationState.FEEDBACK_COLLECTION:

            return await self._handle_feedback_collection(context, message)

        else:

            return await self._handle_intent_gathering(context, message)

    

    async def _handle_initial_greeting(self, context: ConversationContext, message: str) -> str:

        """

        Handles initial user interaction and greeting.

        """

        # Check if user provided intent information in first message

        if len(message.split()) > 3:  # Substantial message likely contains intent

            context.current_state = ConversationState.INTENT_GATHERING

            return await self._handle_intent_gathering(context, message)

        else:

            # Simple greeting or short message

            context.current_state = ConversationState.INTENT_GATHERING

            return self._get_random_template("greeting")

    

    async def _handle_intent_gathering(self, context: ConversationContext, message: str) -> str:

        """

        Processes user input to extract therapeutic intent and determines if clarification is needed.

        """

        # Analyze user intent

        user_intent = await self.intent_analyzer.analyze_user_intent(message)

        context.user_intent = user_intent

        

        # Determine if clarification is needed

        clarification_needed = self._assess_clarification_needs(user_intent)

        

        if clarification_needed:

            context.clarification_needed = clarification_needed

            context.current_state = ConversationState.INTENT_CLARIFICATION

            return self._generate_clarification_request(clarification_needed[0])

        else:

            # Proceed with content search

            context.current_state = ConversationState.CONTENT_SEARCHING

            return await self._perform_content_search_and_recommend(context)

    

    async def _handle_intent_clarification(self, context: ConversationContext, message: str) -> str:

        """

        Handles clarification responses and updates user intent accordingly.

        """

        # Process clarification response

        await self._process_clarification_response(context, message)

        

        # Remove processed clarification from queue

        if context.clarification_needed:

            context.clarification_needed.pop(0)

        

        # Check if more clarification is needed

        if context.clarification_needed:

            return self._generate_clarification_request(context.clarification_needed[0])

        else:

            # Proceed with content search

            context.current_state = ConversationState.CONTENT_SEARCHING

            return await self._perform_content_search_and_recommend(context)

    

    async def _handle_recommendation_interaction(self, context: ConversationContext, message: str) -> str:

        """

        Handles user interactions with presented recommendations.

        """

        message_lower = message.lower()

        

        # Check for specific requests

        if any(word in message_lower for word in ["more", "other", "different", "additional"]):

            return await self._provide_additional_recommendations(context)

        elif any(word in message_lower for word in ["explain", "why", "reason"]):

            return self._provide_recommendation_explanations(context)

        elif any(word in message_lower for word in ["shorter", "longer", "duration"]):

            return await self._filter_by_duration(context, message)

        elif any(word in message_lower for word in ["thank", "thanks", "perfect", "great"]):

            context.current_state = ConversationState.FEEDBACK_COLLECTION

            return "I'm glad I could help! Would you mind sharing which recommendation you found most helpful? Your feedback helps me improve future suggestions."

        else:

            # Try to extract new intent for additional search

            new_intent = await self.intent_analyzer.analyze_user_intent(message)

            if self._is_new_search_request(new_intent, context.user_intent):

                context.user_intent = new_intent

                context.current_state = ConversationState.CONTENT_SEARCHING

                return await self._perform_content_search_and_recommend(context)

            else:

                return "I'm here to help with your meditation and hypnosis needs. You can ask for more recommendations, request different durations, or start a new search by telling me about your current mood and goals."

    

    async def _handle_follow_up(self, context: ConversationContext, message: str) -> str:

        """

        Handles follow-up interactions and additional requests.

        """

        # Similar to recommendation interaction but with different context

        return await self._handle_recommendation_interaction(context, message)

    

    async def _handle_feedback_collection(self, context: ConversationContext, message: str) -> str:

        """

        Collects and processes user feedback on recommendations.

        """

        # Store feedback for future improvement

        await self._store_user_feedback(context, message)

        

        context.current_state = ConversationState.FOLLOW_UP

        return "Thank you for your feedback! Is there anything else I can help you with today? I can search for different types of content or assist with other therapeutic goals."

    

    def _assess_clarification_needs(self, user_intent: UserIntent) -> List[str]:

        """

        Determines what clarification is needed based on extracted user intent.

        """

        clarifications = []

        

        # Check if mood is too vague or neutral

        if user_intent.primary_mood == MoodCategory.NEUTRAL and not user_intent.secondary_moods:

            clarifications.append("your current emotional state or mood")

        

        # Check if no therapeutic goals were identified

        if not user_intent.therapeutic_goals:

            clarifications.append("what you're hoping to achieve or work on")

        

        # Check if intensity is unclear for strong emotional states

        if user_intent.primary_mood in [MoodCategory.ANXIETY, MoodCategory.DEPRESSION] and user_intent.intensity_level == 5:

            clarifications.append("how intense these feelings are for you right now")

        

        # Check if experience level might affect recommendations

        if user_intent.experience_level == "beginner" and not any(word in user_intent.raw_input.lower() for word in ["new", "beginner", "first time", "never"]):

            clarifications.append("your experience level with meditation or hypnosis")

        

        return clarifications

    

    def _generate_clarification_request(self, clarification_topic: str) -> str:

        """

        Generates a natural clarification request for the specified topic.

        """

        template = self._get_random_template("intent_clarification")

        return template.format(clarification_topic=clarification_topic)

    

    async def _process_clarification_response(self, context: ConversationContext, message: str) -> None:

        """

        Processes clarification response and updates user intent.

        """

        # Re-analyze the clarification message and merge with existing intent

        clarification_intent = await self.intent_analyzer.analyze_user_intent(message)

        

        # Merge new information with existing intent

        if context.user_intent:

            # Update primary mood if new one is more specific

            if clarification_intent.primary_mood != MoodCategory.NEUTRAL:

                context.user_intent.primary_mood = clarification_intent.primary_mood

            

            # Add new therapeutic goals

            for goal in clarification_intent.therapeutic_goals:

                if goal not in context.user_intent.therapeutic_goals:

                    context.user_intent.therapeutic_goals.append(goal)

            

            # Update intensity if provided

            if clarification_intent.intensity_level != 5:

                context.user_intent.intensity_level = clarification_intent.intensity_level

            

            # Update experience level if provided

            if clarification_intent.experience_level != "beginner":

                context.user_intent.experience_level = clarification_intent.experience_level

            

            # Update duration preference if provided

            if clarification_intent.session_duration_preference:

                context.user_intent.session_duration_preference = clarification_intent.session_duration_preference

    

    async def _perform_content_search_and_recommend(self, context: ConversationContext) -> str:

        """

        Performs content search and generates recommendations.

        """

        # Show searching message

        searching_message = self._get_random_template("searching")

        

        try:

            # Search for content

            async with self.content_discovery as discovery:

                content_items = await discovery.search_content(context.user_intent, max_results=50)

            

            if not content_items:

                context.current_state = ConversationState.FOLLOW_UP

                return self._get_random_template("no_results")

            

            # Generate recommendations

            user_history = await self._get_user_history(context.user_id) if self.database else None

            recommendations = await self.recommendation_engine.generate_recommendations(

                content_items, context.user_intent, user_history, num_recommendations=10

            )

            

            context.recommendations = recommendations

            context.current_state = ConversationState.RECOMMENDATION_PRESENTATION

            

            return self._format_recommendations_response(recommendations)

            

        except Exception as e:

            context.current_state = ConversationState.FOLLOW_UP

            return "I encountered an issue while searching for content. Please try again or let me know if you'd like to adjust your search criteria."

    

    def _format_recommendations_response(self, recommendations: List[RecommendationScore]) -> str:

        """

        Formats recommendations into a user-friendly response.

        """

        if not recommendations:

            return self._get_random_template("no_results")

        

        intro = self._get_random_template("recommendations_intro").format(count=len(recommendations))

        

        response_parts = [intro, ""]

        

        for i, rec in enumerate(recommendations, 1):

            item = rec.content_item

            duration_minutes = item.duration_seconds // 60

            

            recommendation_text = f"{i}. **{item.title}**\n"

            recommendation_text += f"   Channel: {item.channel_name}\n"

            recommendation_text += f"   Duration: {duration_minutes} minutes\n"

            recommendation_text += f"   URL: {item.url}\n"

            recommendation_text += f"   Why recommended: {rec.explanation}\n"

            

            response_parts.append(recommendation_text)

        

        response_parts.append("\nWould you like more recommendations, different durations, or have any other questions?")

        

        return "\n".join(response_parts)

    

    async def _provide_additional_recommendations(self, context: ConversationContext) -> str:

        """

        Provides additional recommendations beyond the initial set.

        """

        # Search for more content with slightly relaxed criteria

        try:

            async with self.content_discovery as discovery:

                additional_content = await discovery.search_content(context.user_intent, max_results=30)

            

            # Filter out already recommended content

            recommended_urls = {rec.content_item.url for rec in context.recommendations}

            new_content = [item for item in additional_content if item.url not in recommended_urls]

            

            if new_content:

                user_history = await self._get_user_history(context.user_id) if self.database else None

                additional_recommendations = await self.recommendation_engine.generate_recommendations(

                    new_content, context.user_intent, user_history, num_recommendations=5

                )

                

                context.recommendations.extend(additional_recommendations)

                return self._format_recommendations_response(additional_recommendations)

            else:

                return "I've already shown you the best available content for your needs. Would you like me to search with different criteria or help you with something else?"

                

        except Exception as e:

            return "I'm having trouble finding additional content right now. Would you like to try a new search or adjust your preferences?"

    

    def _provide_recommendation_explanations(self, context: ConversationContext) -> str:

        """

        Provides detailed explanations for why specific content was recommended.

        """

        if not context.recommendations:

            return "I don't have any current recommendations to explain. Would you like me to search for content first?"

        

        explanations = ["Here's why I recommended each session:\n"]

        

        for i, rec in enumerate(context.recommendations[:5], 1):  # Limit to top 5 for readability

            explanation = f"{i}. **{rec.content_item.title}**\n"

            explanation += f"   Mood relevance: {rec.mood_relevance:.1%}\n"

            explanation += f"   Goal alignment: {rec.goal_relevance:.1%}\n"

            explanation += f"   Quality score: {rec.quality_score:.1%}\n"

            explanation += f"   {rec.explanation}\n"

            explanations.append(explanation)

        

        return "\n".join(explanations)

    

    async def _filter_by_duration(self, context: ConversationContext, message: str) -> str:

        """

        Filters current recommendations by duration preferences mentioned in message.

        """

        # Extract duration preference from message

        duration_keywords = {

            "short": (0, 900),      # 0-15 minutes

            "medium": (900, 1800),  # 15-30 minutes

            "long": (1800, 3600),   # 30-60 minutes

            "quick": (0, 600),      # 0-10 minutes

            "brief": (0, 600),      # 0-10 minutes

            "extended": (1800, 3600) # 30-60 minutes

        }

        

        message_lower = message.lower()

        duration_range = None

        

        for keyword, range_tuple in duration_keywords.items():

            if keyword in message_lower:

                duration_range = range_tuple

                break

        

        # Look for specific minute mentions

        import re

        minute_match = re.search(r'(\d+)\s*(?:minute|min)', message_lower)

        if minute_match:

            target_minutes = int(minute_match.group(1))

            duration_range = (target_minutes * 60 - 300, target_minutes * 60 + 300)  # ±5 minutes

        

        if duration_range:

            min_duration, max_duration = duration_range

            filtered_recommendations = [

                rec for rec in context.recommendations

                if min_duration <= rec.content_item.duration_seconds <= max_duration

            ]

            

            if filtered_recommendations:

                return self._format_recommendations_response(filtered_recommendations)

            else:

                return f"I don't have recommendations in that duration range. Would you like me to search for content with your preferred duration?"

        else:

            return "I didn't understand the duration preference. Could you specify if you want short (under 15 min), medium (15-30 min), or long (30+ min) sessions?"

    

    def _is_new_search_request(self, new_intent: UserIntent, current_intent: Optional[UserIntent]) -> bool:

        """

        Determines if the user is requesting a new search based on intent comparison.

        """

        if not current_intent:

            return True

        

        # Check for significant changes in mood or goals

        mood_changed = new_intent.primary_mood != current_intent.primary_mood

        goals_changed = set(new_intent.therapeutic_goals) != set(current_intent.therapeutic_goals)

        intensity_changed = abs(new_intent.intensity_level - current_intent.intensity_level) > 2

        

        return mood_changed or goals_changed or intensity_changed

    

    def _get_random_template(self, template_type: str) -> str:

        """

        Returns a random template from the specified type.

        """

        import random

        templates = self.response_templates.get(template_type, ["I'm here to help you find therapeutic content."])

        return random.choice(templates)

    

    async def _get_user_history(self, user_id: str) -> Optional[List[ContentItem]]:

        """

        Retrieves user's content viewing history from database.

        """

        if not self.database:

            return None

        

        # Database query implementation would go here

        # This is a placeholder for the actual database integration

        return None

    

    async def _save_conversation_context(self, context: ConversationContext) -> None:

        """

        Saves conversation context to database for persistence.

        """

        if not self.database:

            return

        

        # Database save implementation would go here

        # This is a placeholder for the actual database integration

        pass

    

    async def _store_user_feedback(self, context: ConversationContext, feedback: str) -> None:

        """

        Stores user feedback for system improvement.

        """

        feedback_data = {

            "user_id": context.user_id,

            "session_id": context.session_id,

            "feedback": feedback,

            "recommendations": [rec.content_item.url for rec in context.recommendations],

            "user_intent": context.user_intent.__dict__ if context.user_intent else None,

            "timestamp": datetime.now().isoformat()

        }

        

        if self.database:

            # Store in database

            pass

        else:

            # Log to file or other storage mechanism

            print(f"User feedback collected: {json.dumps(feedback_data, indent=2)}")


The conversational interface manages complex dialogue flows through a state machine architecture that tracks conversation progress and maintains context across multiple interactions. The system employs sophisticated natural language understanding to interpret user responses and determine appropriate follow-up actions.

The interface supports various interaction patterns including clarification requests, recommendation refinement, and iterative search improvement. Users can request additional recommendations, filter by specific criteria, or start entirely new searches without losing conversational context.


Complete System Integration and Running Example


The following complete example demonstrates the integration of all system components into a functional therapeutic content discovery chatbot. This implementation showcases the end-to-end workflow from user input processing through content discovery and recommendation presentation.


import asyncio

import os

from typing import Optional

import logging


# Configure logging

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)


class TherapeuticContentDiscoverySystem:

    """

    Complete system integration for therapeutic content discovery chatbot.

    """

    

    def __init__(self, openai_api_key: str, youtube_api_key: str):

        self.chatbot = TherapeuticContentChatbot(openai_api_key, youtube_api_key)

        self.active_sessions = {}

        

    async def start_conversation(self, user_id: str) -> str:

        """

        Initiates a new conversation with the user.

        """

        welcome_message = await self.chatbot.process_user_message(

            user_id, 

            "Hello"  # Initial greeting trigger

        )

        return welcome_message

    

    async def process_user_input(self, user_id: str, user_message: str) -> str:

        """

        Processes user input and returns appropriate response.

        """

        try:

            response = await self.chatbot.process_user_message(user_id, user_message)

            return response

        except Exception as e:

            logger.error(f"Error processing user input: {e}")

            return "I apologize for the technical difficulty. Please try again or rephrase your request."

    

    async def get_conversation_history(self, user_id: str) -> List[Dict[str, str]]:

        """

        Retrieves conversation history for the specified user.

        """

        context = self.chatbot.conversation_contexts.get(user_id)

        if context:

            return context.conversation_history

        return []

    

    def get_active_sessions_count(self) -> int:

        """

        Returns the number of currently active conversation sessions.

        """

        return len(self.chatbot.conversation_contexts)



async def demonstrate_system_functionality():

    """

    Comprehensive demonstration of the therapeutic content discovery system.

    """

    # Initialize system with API keys (in production, these would come from environment variables)

    openai_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key_here")

    youtube_key = os.getenv("YOUTUBE_API_KEY", "your_youtube_api_key_here")

    

    system = TherapeuticContentDiscoverySystem(openai_key, youtube_key)

    

    # Simulate user interactions

    user_id = "demo_user_001"

    

    print("=== Therapeutic Content Discovery System Demo ===\n")

    

    # Start conversation

    print("System: Starting conversation...")

    welcome_response = await system.start_conversation(user_id)

    print(f"Bot: {welcome_response}\n")

    

    # Simulate user expressing anxiety and sleep issues

    user_input_1 = "I'm feeling really anxious about work lately and I'm having trouble sleeping. I need something to help me relax and get better rest."

    print(f"User: {user_input_1}")

    response_1 = await system.process_user_input(user_id, user_input_1)

    print(f"Bot: {response_1}\n")

    

    # Simulate user providing clarification if needed

    user_input_2 = "I'd say my anxiety level is about 7 out of 10, and I'm a beginner with meditation. I prefer sessions around 20 minutes."

    print(f"User: {user_input_2}")

    response_2 = await system.process_user_input(user_id, user_input_2)

    print(f"Bot: {response_2}\n")

    

    # Simulate user requesting more recommendations

    user_input_3 = "These look good, but could you show me some shorter options? Maybe around 10-15 minutes?"

    print(f"User: {user_input_3}")

    response_3 = await system.process_user_input(user_id, user_input_3)

    print(f"Bot: {response_3}\n")

    

    # Simulate user expressing satisfaction

    user_input_4 = "Perfect! Thank you so much. The second recommendation looks exactly what I need."

    print(f"User: {user_input_4}")

    response_4 = await system.process_user_input(user_id, user_input_4)

    print(f"Bot: {response_4}\n")

    

    # Display conversation statistics

    history = await system.get_conversation_history(user_id)

    print(f"Conversation completed with {len(history)} total exchanges.")

    print(f"Active sessions: {system.get_active_sessions_count()}")



class SystemConfiguration:

    """

    Configuration management for the therapeutic content discovery system.

    """

    

    def __init__(self):

        self.openai_api_key = os.getenv("OPENAI_API_KEY")

        self.youtube_api_key = os.getenv("YOUTUBE_API_KEY")

        self.database_url = os.getenv("DATABASE_URL")

        self.redis_url = os.getenv("REDIS_URL")

        self.log_level = os.getenv("LOG_LEVEL", "INFO")

        

        # Validate required configuration

        self._validate_configuration()

    

    def _validate_configuration(self):

        """

        Validates that all required configuration parameters are present.

        """

        required_keys = ["openai_api_key", "youtube_api_key"]

        missing_keys = [key for key in required_keys if not getattr(self, key)]

        

        if missing_keys:

            raise ValueError(f"Missing required configuration: {', '.join(missing_keys)}")

    

    def get_database_config(self) -> Optional[Dict[str, str]]:

        """

        Returns database configuration if available.

        """

        if self.database_url:

            return {"url": self.database_url}

        return None

    

    def get_cache_config(self) -> Optional[Dict[str, str]]:

        """

        Returns cache configuration if available.

        """

        if self.redis_url:

            return {"url": self.redis_url}

        return None



async def production_system_startup():

    """

    Production-ready system startup with proper configuration and error handling.

    """

    try:

        # Load configuration

        config = SystemConfiguration()

        

        # Initialize system

        system = TherapeuticContentDiscoverySystem(

            config.openai_api_key,

            config.youtube_api_key

        )

        

        logger.info("Therapeutic Content Discovery System started successfully")

        

        # In a production environment, this would integrate with a web framework

        # such as FastAPI, Flask, or Django to handle HTTP requests

        

        return system

        

    except Exception as e:

        logger.error(f"Failed to start system: {e}")

        raise



# Health check and monitoring functions

async def system_health_check(system: TherapeuticContentDiscoverySystem) -> Dict[str, Any]:

    """

    Performs system health check and returns status information.

    """

    health_status = {

        "status": "healthy",

        "timestamp": datetime.now().isoformat(),

        "active_sessions": system.get_active_sessions_count(),

        "components": {}

    }

    

    # Check OpenAI API connectivity

    try:

        test_response = await system.chatbot.intent_analyzer._call_llm(

            "You are a test assistant.", 

            "Respond with 'OK' if you can process this message."

        )

        health_status["components"]["openai"] = "healthy" if "OK" in test_response else "degraded"

    except Exception as e:

        health_status["components"]["openai"] = "unhealthy"

        health_status["status"] = "degraded"

    

    # Check YouTube API connectivity

    try:

        async with system.chatbot.content_discovery as discovery:

            # Perform a minimal test search

            test_results = await discovery._execute_search(

                SearchQuery(

                    keywords=["meditation"],

                    duration_range=(300, 1800),

                    upload_date_range=(datetime.now() - timedelta(days=30), datetime.now()),

                    minimum_rating=3.0,

                    content_type="meditation",

                    experience_level="beginner"

                ),

                max_results=1

            )

        health_status["components"]["youtube"] = "healthy"

    except Exception as e:

        health_status["components"]["youtube"] = "unhealthy"

        health_status["status"] = "degraded"

    

    return health_status



if __name__ == "__main__":

    # Run the demonstration

    asyncio.run(demonstrate_system_functionality())


This complete implementation demonstrates the sophisticated integration of natural language processing, content discovery, machine learning-based recommendation algorithms, and conversational interface management. The system provides a seamless user experience while maintaining high standards for content quality and relevance.

The architecture supports scalability through asynchronous processing, proper error handling, and modular component design. The system can be easily extended to support additional content platforms, enhanced personalization features, and advanced analytics capabilities.

The therapeutic content discovery chatbot represents a significant advancement in digital wellness technology, bridging the gap between user emotional needs and available therapeutic resources through intelligent automation and personalized recommendation algorithms. The system's ability to understand nuanced emotional states and match them with appropriate content creates substantial value for users seeking mental health and wellness support.

No comments: