Friday, March 06, 2026

BUILDING AN RECOMMENDER SYSTEM




INTRODUCTION


Recommender systems have become the backbone of modern digital experiences, powering everything from e-commerce platforms to streaming services. These sophisticated systems analyze user behavior, preferences, and item characteristics to predict what users might find interesting or valuable. The quality of recommendations directly impacts user engagement, satisfaction, and business metrics, making the development of excellent recommender systems a critical competency for technology companies.


An excellent recommender system goes beyond simple popularity-based suggestions. It must understand individual user preferences, adapt to changing tastes, handle new users and items gracefully, and provide diverse yet relevant recommendations. The system must also operate at scale, delivering real-time recommendations to millions of users while maintaining high accuracy and low latency.


The fundamental challenge in building recommender systems lies in the sparse nature of user-item interaction data. Most users interact with only a tiny fraction of available items, creating a sparse matrix where the system must infer preferences from limited information. Additionally, user preferences evolve over time, requiring systems that can adapt and learn continuously.


CORE ARCHITECTURAL COMPONENTS


Data Collection and Management


The foundation of any excellent recommender system is high-quality, comprehensive data. This data typically falls into three categories: explicit feedback, implicit feedback, and contextual information. Explicit feedback includes direct user ratings, reviews, and preferences explicitly stated by users. Implicit feedback encompasses user behavior such as clicks, views, purchase history, and time spent on items. Contextual information includes temporal data, user demographics, item metadata, and environmental factors.



class UserInteraction:

    def __init__(self, user_id, item_id, interaction_type, 

                 rating=None, timestamp=None, context=None):

        self.user_id = user_id

        self.item_id = item_id

        self.interaction_type = interaction_type  # 'rating', 'view', 'purchase'

        self.rating = rating

        self.timestamp = timestamp

        self.context = context or {}

    

    def to_dict(self):

        return {

            'user_id': self.user_id,

            'item_id': self.item_id,

            'interaction_type': self.interaction_type,

            'rating': self.rating,

            'timestamp': self.timestamp,

            'context': self.context

        }



Data preprocessing involves cleaning, normalizing, and structuring the collected information. This includes handling missing values, removing outliers, normalizing ratings scales, and creating consistent user and item identifiers. The preprocessing pipeline must also address data quality issues such as fake reviews, bot interactions, and inconsistent formatting.


Feature Engineering and Representation


Feature engineering transforms raw data into meaningful representations that algorithms can effectively utilize. For users, features might include demographic information, historical preferences, interaction patterns, and derived characteristics such as preference diversity or rating tendencies. Item features encompass metadata like categories, descriptions, prices, and computed features such as popularity scores or content similarity measures.



class FeatureExtractor:

    def __init__(self):

        self.user_features = {}

        self.item_features = {}

    

    def extract_user_features(self, user_id, interactions):

        """Extract comprehensive user features from interaction history"""

        features = {

            'avg_rating': self._calculate_average_rating(interactions),

            'rating_variance': self._calculate_rating_variance(interactions),

            'interaction_count': len(interactions),

            'preferred_categories': self._get_preferred_categories(interactions),

            'activity_pattern': self._analyze_activity_pattern(interactions),

            'recency_score': self._calculate_recency_score(interactions)

        }

        self.user_features[user_id] = features

        return features

    

    def _calculate_average_rating(self, interactions):

        ratings = [i.rating for i in interactions if i.rating is not None]

        return sum(ratings) / len(ratings) if ratings else 0.0

    

    def _calculate_rating_variance(self, interactions):

        ratings = [i.rating for i in interactions if i.rating is not None]

        if len(ratings) < 2:

            return 0.0

        avg = sum(ratings) / len(ratings)

        return sum((r - avg) ** 2 for r in ratings) / len(ratings)



Advanced feature engineering techniques include embedding representations learned through neural networks, which can capture complex, non-linear relationships in the data. These embeddings often provide more nuanced representations than manually crafted features, particularly for high-dimensional categorical data.


Algorithm Selection and Implementation


The choice of recommendation algorithm depends on the specific use case, data characteristics, and system requirements. Collaborative filtering algorithms leverage user-item interactions to identify similar users or items. Memory-based collaborative filtering computes similarities between users or items directly, while model-based approaches learn latent factors that explain observed interactions.



import numpy as np

from scipy.sparse import csr_matrix

from sklearn.metrics.pairwise import cosine_similarity


class CollaborativeFilter:

    def __init__(self, similarity_metric='cosine'):

        self.similarity_metric = similarity_metric

        self.user_item_matrix = None

        self.user_similarity = None

        self.item_similarity = None

    

    def fit(self, interactions):

        """Build user-item matrix and compute similarities"""

        self.user_item_matrix = self._build_user_item_matrix(interactions)

        self.user_similarity = self._compute_user_similarity()

        self.item_similarity = self._compute_item_similarity()

    

    def _build_user_item_matrix(self, interactions):

        """Create sparse user-item interaction matrix"""

        users = sorted(set(i.user_id for i in interactions))

        items = sorted(set(i.item_id for i in interactions))

        

        user_to_idx = {user: idx for idx, user in enumerate(users)}

        item_to_idx = {item: idx for idx, item in enumerate(items)}

        

        rows, cols, data = [], [], []

        for interaction in interactions:

            if interaction.rating is not None:

                rows.append(user_to_idx[interaction.user_id])

                cols.append(item_to_idx[interaction.item_id])

                data.append(interaction.rating)

        

        matrix = csr_matrix((data, (rows, cols)), 

                           shape=(len(users), len(items)))

        return matrix

    

    def _compute_user_similarity(self):

        """Compute user-user similarity matrix"""

        return cosine_similarity(self.user_item_matrix)

    

    def predict_rating(self, user_id, item_id, k=50):

        """Predict rating using k-nearest neighbors"""

        if user_id not in self.user_to_idx or item_id not in self.item_to_idx:

            return self._global_average_rating()

        

        user_idx = self.user_to_idx[user_id]

        item_idx = self.item_to_idx[item_id]

        

        # Find k most similar users who rated this item

        similar_users = self._get_similar_users(user_idx, k)

        weighted_sum = 0.0

        similarity_sum = 0.0

        

        for similar_user_idx, similarity in similar_users:

            if self.user_item_matrix[similar_user_idx, item_idx] > 0:

                rating = self.user_item_matrix[similar_user_idx, item_idx]

                weighted_sum += similarity * rating

                similarity_sum += abs(similarity)

        

        if similarity_sum > 0:

            return weighted_sum / similarity_sum

        else:

            return self._global_average_rating()



Content-based filtering analyzes item features to recommend similar items to those a user has previously liked. This approach works well for items with rich metadata and can handle new items effectively, though it may suffer from limited diversity in recommendations.


Matrix factorization techniques decompose the user-item interaction matrix into lower-dimensional representations, capturing latent factors that explain user preferences and item characteristics. These methods often achieve superior performance compared to neighborhood-based approaches, particularly with sparse data.



class MatrixFactorization:

    def __init__(self, n_factors=50, learning_rate=0.01, 

                 regularization=0.01, n_epochs=100):

        self.n_factors = n_factors

        self.learning_rate = learning_rate

        self.regularization = regularization

        self.n_epochs = n_epochs

        self.user_factors = None

        self.item_factors = None

        self.user_biases = None

        self.item_biases = None

        self.global_bias = None

    

    def fit(self, interactions):

        """Train matrix factorization model using SGD"""

        self._initialize_parameters(interactions)

        

        for epoch in range(self.n_epochs):

            epoch_loss = 0.0

            for interaction in interactions:

                if interaction.rating is not None:

                    loss = self._update_parameters(interaction)

                    epoch_loss += loss

            

            if epoch % 10 == 0:

                print(f"Epoch {epoch}, Loss: {epoch_loss:.4f}")

    

    def _initialize_parameters(self, interactions):

        """Initialize user and item factors randomly"""

        users = set(i.user_id for i in interactions)

        items = set(i.item_id for i in interactions)

        

        self.user_factors = {u: np.random.normal(0, 0.1, self.n_factors) 

                            for u in users}

        self.item_factors = {i: np.random.normal(0, 0.1, self.n_factors) 

                            for i in items}

        self.user_biases = {u: 0.0 for u in users}

        self.item_biases = {i: 0.0 for i in items}

        

        # Calculate global bias (overall average rating)

        ratings = [i.rating for i in interactions if i.rating is not None]

        self.global_bias = sum(ratings) / len(ratings)

    

    def predict_rating(self, user_id, item_id):

        """Predict rating for user-item pair"""

        if user_id not in self.user_factors or item_id not in self.item_factors:

            return self.global_bias

        

        user_factor = self.user_factors[user_id]

        item_factor = self.item_factors[item_id]

        user_bias = self.user_biases[user_id]

        item_bias = self.item_biases[item_id]

        

        prediction = (self.global_bias + user_bias + item_bias + 

                     np.dot(user_factor, item_factor))

        return max(1.0, min(5.0, prediction))  # Clamp to rating range



Evaluation Metrics and Validation


Evaluating recommender systems requires multiple metrics that capture different aspects of recommendation quality. Accuracy metrics such as Mean Absolute Error (MAE) and Root Mean Square Error (RMSE) measure how close predicted ratings are to actual ratings. Ranking metrics like Precision at K, Recall at K, and Normalized Discounted Cumulative Gain (NDCG) evaluate the quality of top-N recommendations.



class RecommenderEvaluator:

    def __init__(self):

        self.metrics = {}

    

    def evaluate_accuracy(self, predictions, ground_truth):

        """Calculate accuracy metrics for rating predictions"""

        errors = []

        squared_errors = []

        

        for (user_id, item_id), predicted_rating in predictions.items():

            if (user_id, item_id) in ground_truth:

                actual_rating = ground_truth[(user_id, item_id)]

                error = abs(predicted_rating - actual_rating)

                squared_error = (predicted_rating - actual_rating) ** 2

                

                errors.append(error)

                squared_errors.append(squared_error)

        

        mae = sum(errors) / len(errors) if errors else 0.0

        rmse = (sum(squared_errors) / len(squared_errors)) ** 0.5 if squared_errors else 0.0

        

        return {'MAE': mae, 'RMSE': rmse}

    

    def evaluate_ranking(self, recommendations, ground_truth, k=10):

        """Calculate ranking metrics for top-K recommendations"""

        precision_scores = []

        recall_scores = []

        ndcg_scores = []

        

        for user_id, recommended_items in recommendations.items():

            if user_id in ground_truth:

                relevant_items = set(ground_truth[user_id])

                recommended_k = recommended_items[:k]

                

                # Calculate precision@k

                relevant_recommended = len(set(recommended_k) & relevant_items)

                precision = relevant_recommended / len(recommended_k) if recommended_k else 0.0

                precision_scores.append(precision)

                

                # Calculate recall@k

                recall = relevant_recommended / len(relevant_items) if relevant_items else 0.0

                recall_scores.append(recall)

                

                # Calculate NDCG@k

                ndcg = self._calculate_ndcg(recommended_k, relevant_items, k)

                ndcg_scores.append(ndcg)

        

        return {

            'Precision@K': sum(precision_scores) / len(precision_scores),

            'Recall@K': sum(recall_scores) / len(recall_scores),

            'NDCG@K': sum(ndcg_scores) / len(ndcg_scores)

        }

    

    def _calculate_ndcg(self, recommended_items, relevant_items, k):

        """Calculate Normalized Discounted Cumulative Gain"""

        dcg = 0.0

        for i, item in enumerate(recommended_items[:k]):

            if item in relevant_items:

                dcg += 1.0 / np.log2(i + 2)  # i+2 because log2(1) = 0

        

        # Calculate ideal DCG

        idcg = sum(1.0 / np.log2(i + 2) for i in range(min(k, len(relevant_items))))

        

        return dcg / idcg if idcg > 0 else 0.0



Beyond traditional metrics, modern recommender systems must also consider diversity, novelty, and fairness. Diversity metrics measure how different recommended items are from each other, preventing filter bubbles. Novelty metrics assess whether recommendations introduce users to new, previously unknown items. Fairness metrics ensure that recommendations do not discriminate against certain user groups or item categories.


ADVANCED ALGORITHMIC APPROACHES


Deep Learning Integration


Modern recommender systems increasingly leverage deep learning techniques to capture complex, non-linear patterns in user behavior and item characteristics. Neural collaborative filtering extends traditional matrix factorization by replacing the inner product with neural networks, enabling more sophisticated modeling of user-item interactions.



import torch

import torch.nn as nn

import torch.optim as optim


class NeuralCollaborativeFiltering(nn.Module):

    def __init__(self, n_users, n_items, embedding_dim=64, 

                 hidden_dims=[128, 64, 32]):

        super(NeuralCollaborativeFiltering, self).__init__()

        

        # Embedding layers for users and items

        self.user_embedding = nn.Embedding(n_users, embedding_dim)

        self.item_embedding = nn.Embedding(n_items, embedding_dim)

        

        # Neural MF layers

        layers = []

        input_dim = embedding_dim * 2

        for hidden_dim in hidden_dims:

            layers.append(nn.Linear(input_dim, hidden_dim))

            layers.append(nn.ReLU())

            layers.append(nn.Dropout(0.2))

            input_dim = hidden_dim

        

        layers.append(nn.Linear(input_dim, 1))

        layers.append(nn.Sigmoid())

        

        self.neural_layers = nn.Sequential(*layers)

        

        # Initialize embeddings

        nn.init.normal_(self.user_embedding.weight, std=0.01)

        nn.init.normal_(self.item_embedding.weight, std=0.01)

    

    def forward(self, user_ids, item_ids):

        """Forward pass through the neural network"""

        user_embed = self.user_embedding(user_ids)

        item_embed = self.item_embedding(item_ids)

        

        # Concatenate user and item embeddings

        x = torch.cat([user_embed, item_embed], dim=1)

        

        # Pass through neural layers

        output = self.neural_layers(x)

        

        return output.squeeze()

    

    def predict(self, user_id, item_id):

        """Predict rating for a single user-item pair"""

        self.eval()

        with torch.no_grad():

            user_tensor = torch.tensor([user_id])

            item_tensor = torch.tensor([item_id])

            prediction = self.forward(user_tensor, item_tensor)

            return prediction.item() * 4 + 1  # Scale to 1-5 rating range



Autoencoders provide another powerful approach for recommender systems, learning compressed representations of user preferences or item characteristics. Variational autoencoders can generate diverse recommendations by sampling from learned probability distributions.


Hybrid System Architecture


Excellent recommender systems often combine multiple approaches to leverage the strengths of different algorithms while mitigating their individual weaknesses. Hybrid systems can be implemented through weighted combinations, switching mechanisms, or ensemble methods.



class HybridRecommender:

    def __init__(self, algorithms, weights=None):

        self.algorithms = algorithms

        self.weights = weights or [1.0 / len(algorithms)] * len(algorithms)

        self.performance_history = {alg.__class__.__name__: [] 

                                  for alg in algorithms}

    

    def fit(self, train_data, validation_data=None):

        """Train all component algorithms"""

        for algorithm in self.algorithms:

            algorithm.fit(train_data)

        

        # Adjust weights based on validation performance if available

        if validation_data:

            self._optimize_weights(validation_data)

    

    def predict_rating(self, user_id, item_id):

        """Combine predictions from all algorithms"""

        predictions = []

        valid_weights = []

        

        for algorithm, weight in zip(self.algorithms, self.weights):

            try:

                prediction = algorithm.predict_rating(user_id, item_id)

                if prediction is not None:

                    predictions.append(prediction)

                    valid_weights.append(weight)

            except Exception as e:

                print(f"Algorithm {algorithm.__class__.__name__} failed: {e}")

                continue

        

        if not predictions:

            return 3.0  # Default neutral rating

        

        # Weighted average of predictions

        total_weight = sum(valid_weights)

        weighted_sum = sum(p * w for p, w in zip(predictions, valid_weights))

        

        return weighted_sum / total_weight

    

    def recommend_items(self, user_id, n_recommendations=10, 

                       candidate_items=None):

        """Generate top-N recommendations using ensemble approach"""

        if candidate_items is None:

            candidate_items = self._get_candidate_items(user_id)

        

        # Get predictions from all algorithms

        item_scores = {}

        for item_id in candidate_items:

            score = self.predict_rating(user_id, item_id)

            item_scores[item_id] = score

        

        # Sort by predicted score and return top N

        sorted_items = sorted(item_scores.items(), 

                            key=lambda x: x[1], reverse=True)

        

        return [item_id for item_id, score in sorted_items[:n_recommendations]]

    

    def _optimize_weights(self, validation_data):

        """Optimize algorithm weights based on validation performance"""

        evaluator = RecommenderEvaluator()

        

        for i, algorithm in enumerate(self.algorithms):

            predictions = {}

            ground_truth = {}

            

            for interaction in validation_data:

                if interaction.rating is not None:

                    pred = algorithm.predict_rating(interaction.user_id, 

                                                  interaction.item_id)

                    predictions[(interaction.user_id, interaction.item_id)] = pred

                    ground_truth[(interaction.user_id, interaction.item_id)] = interaction.rating

            

            metrics = evaluator.evaluate_accuracy(predictions, ground_truth)

            rmse = metrics['RMSE']

            

            # Lower RMSE gets higher weight (inverse relationship)

            self.weights[i] = 1.0 / (1.0 + rmse)

        

        # Normalize weights

        total_weight = sum(self.weights)

        self.weights = [w / total_weight for w in self.weights]



HANDLING SYSTEM CHALLENGES


Cold Start Problem


The cold start problem occurs when the system lacks sufficient information about new users or items to make accurate recommendations. For new users, the system can employ demographic-based recommendations, popularity-based suggestions, or interactive onboarding processes to quickly gather preference information.



class ColdStartHandler:

    def __init__(self, main_recommender, fallback_strategies):

        self.main_recommender = main_recommender

        self.fallback_strategies = fallback_strategies

        self.user_interaction_threshold = 5

        self.item_interaction_threshold = 10

    

    def recommend_for_user(self, user_id, n_recommendations=10):

        """Handle recommendations for potentially cold users"""

        user_interaction_count = self._get_user_interaction_count(user_id)

        

        if user_interaction_count >= self.user_interaction_threshold:

            # Sufficient data for main recommender

            return self.main_recommender.recommend_items(user_id, n_recommendations)

        else:

            # Apply cold start strategy

            return self._apply_cold_start_strategy(user_id, user_interaction_count, 

                                                 n_recommendations)

    

    def _apply_cold_start_strategy(self, user_id, interaction_count, n_recommendations):

        """Apply appropriate cold start strategy based on available data"""

        if interaction_count == 0:

            # Completely new user - use popularity-based recommendations

            return self.fallback_strategies['popularity'].recommend_items(

                user_id, n_recommendations)

        else:

            # Some interactions available - use hybrid approach

            main_recs = self.main_recommender.recommend_items(

                user_id, n_recommendations // 2)

            popularity_recs = self.fallback_strategies['popularity'].recommend_items(

                user_id, n_recommendations - len(main_recs))

            

            # Combine and deduplicate

            combined_recs = main_recs + [item for item in popularity_recs 

                                       if item not in main_recs]

            return combined_recs[:n_recommendations]

    

    def handle_new_item(self, item_id, item_features):

        """Generate initial recommendations for new items"""

        if 'content_based' in self.fallback_strategies:

            # Use content-based similarity to existing items

            similar_items = self.fallback_strategies['content_based'].find_similar_items(

                item_id, item_features)

            

            # Recommend to users who liked similar items

            target_users = []

            for similar_item in similar_items[:10]:  # Top 10 similar items

                users_who_liked = self._get_users_who_liked_item(similar_item)

                target_users.extend(users_who_liked)

            

            return list(set(target_users))  # Remove duplicates

        

        return []  # No strategy available



Scalability and Performance Optimization


Production recommender systems must handle millions of users and items while providing real-time recommendations. This requires careful attention to algorithmic complexity, data structures, and system architecture.



import redis

import pickle

from concurrent.futures import ThreadPoolExecutor

import numpy as np


class ScalableRecommender:

    def __init__(self, base_recommender, cache_config=None):

        self.base_recommender = base_recommender

        self.cache = redis.Redis(**cache_config) if cache_config else None

        self.precomputed_similarities = {}

        self.user_embeddings = {}

        self.item_embeddings = {}

        self.executor = ThreadPoolExecutor(max_workers=4)

    

    def precompute_similarities(self, items, batch_size=1000):

        """Precompute item-item similarities for faster recommendations"""

        print("Precomputing item similarities...")

        

        for i in range(0, len(items), batch_size):

            batch_items = items[i:i + batch_size]

            

            # Compute similarities for this batch

            batch_similarities = self._compute_batch_similarities(batch_items, items)

            

            # Store in memory and cache

            for item_id, similarities in batch_similarities.items():

                self.precomputed_similarities[item_id] = similarities

                

                if self.cache:

                    cache_key = f"similarities:{item_id}"

                    self.cache.setex(cache_key, 3600, pickle.dumps(similarities))

    

    def get_recommendations_fast(self, user_id, n_recommendations=10):

        """Fast recommendation generation using precomputed data"""

        # Try cache first

        if self.cache:

            cache_key = f"recommendations:{user_id}:{n_recommendations}"

            cached_result = self.cache.get(cache_key)

            if cached_result:

                return pickle.loads(cached_result)

        

        # Generate recommendations

        user_history = self._get_user_history(user_id)

        candidate_scores = {}

        

        # Use precomputed similarities for fast scoring

        for item_id in user_history:

            if item_id in self.precomputed_similarities:

                similarities = self.precomputed_similarities[item_id]

                

                for candidate_item, similarity in similarities.items():

                    if candidate_item not in user_history:

                        if candidate_item not in candidate_scores:

                            candidate_scores[candidate_item] = 0.0

                        candidate_scores[candidate_item] += similarity

        

        # Sort and get top recommendations

        sorted_candidates = sorted(candidate_scores.items(), 

                                 key=lambda x: x[1], reverse=True)

        recommendations = [item_id for item_id, score in sorted_candidates[:n_recommendations]]

        

        # Cache result

        if self.cache:

            cache_key = f"recommendations:{user_id}:{n_recommendations}"

            self.cache.setex(cache_key, 1800, pickle.dumps(recommendations))

        

        return recommendations

    

    def batch_recommend(self, user_ids, n_recommendations=10):

        """Generate recommendations for multiple users in parallel"""

        futures = []

        

        for user_id in user_ids:

            future = self.executor.submit(self.get_recommendations_fast, 

                                        user_id, n_recommendations)

            futures.append((user_id, future))

        

        results = {}

        for user_id, future in futures:

            try:

                results[user_id] = future.result(timeout=5.0)

            except Exception as e:

                print(f"Failed to generate recommendations for user {user_id}: {e}")

                results[user_id] = []

        

        return results

    

    def update_user_profile_async(self, user_id, new_interactions):

        """Asynchronously update user profile and invalidate cache"""

        def update_task():

            # Update base recommender

            self.base_recommender.update_user_profile(user_id, new_interactions)

            

            # Invalidate cached recommendations

            if self.cache:

                pattern = f"recommendations:{user_id}:*"

                for key in self.cache.scan_iter(match=pattern):

                    self.cache.delete(key)

        

        self.executor.submit(update_task)



PRODUCTION DEPLOYMENT CONSIDERATIONS


Real-time Recommendation Pipeline


Production recommender systems require sophisticated pipelines that can process user interactions in real-time and update recommendations accordingly. This involves stream processing, feature stores, and model serving infrastructure.



import asyncio

import json

from datetime import datetime, timedelta


class RealtimeRecommendationPipeline:

    def __init__(self, recommender, feature_store, model_store):

        self.recommender = recommender

        self.feature_store = feature_store

        self.model_store = model_store

        self.interaction_buffer = []

        self.last_model_update = datetime.now()

        self.model_update_interval = timedelta(hours=1)

    

    async def process_interaction(self, interaction_data):

        """Process incoming user interaction in real-time"""

        interaction = UserInteraction(**interaction_data)

        

        # Add to buffer for batch processing

        self.interaction_buffer.append(interaction)

        

        # Update user features immediately

        await self._update_user_features_async(interaction.user_id, interaction)

        

        # Invalidate cached recommendations for this user

        await self._invalidate_user_cache(interaction.user_id)

        

        # Check if model needs updating

        if self._should_update_model():

            await self._trigger_model_update()

    

    async def get_recommendations(self, user_id, n_recommendations=10, 

                                context=None):

        """Get real-time recommendations for user"""

        # Get current user features

        user_features = await self.feature_store.get_user_features(user_id)

        

        # Apply contextual filtering if context provided

        if context:

            candidate_items = await self._get_contextual_candidates(user_id, context)

        else:

            candidate_items = await self._get_default_candidates(user_id)

        

        # Score candidates using current model

        scored_items = []

        for item_id in candidate_items:

            score = await self._score_item_async(user_id, item_id, context)

            scored_items.append((item_id, score))

        

        # Sort by score and return top N

        scored_items.sort(key=lambda x: x[1], reverse=True)

        recommendations = [item_id for item_id, score in scored_items[:n_recommendations]]

        

        # Log recommendation event

        await self._log_recommendation_event(user_id, recommendations, context)

        

        return recommendations

    

    async def _update_user_features_async(self, user_id, interaction):

        """Asynchronously update user features"""

        current_features = await self.feature_store.get_user_features(user_id)

        

        # Update interaction count

        current_features['interaction_count'] = current_features.get('interaction_count', 0) + 1

        

        # Update average rating if rating provided

        if interaction.rating is not None:

            current_avg = current_features.get('avg_rating', 0.0)

            current_count = current_features.get('rating_count', 0)

            new_avg = (current_avg * current_count + interaction.rating) / (current_count + 1)

            current_features['avg_rating'] = new_avg

            current_features['rating_count'] = current_count + 1

        

        # Update last activity timestamp

        current_features['last_activity'] = interaction.timestamp or datetime.now().isoformat()

        

        # Store updated features

        await self.feature_store.update_user_features(user_id, current_features)

    

    async def _score_item_async(self, user_id, item_id, context=None):

        """Score an item for a user with optional context"""

        base_score = self.recommender.predict_rating(user_id, item_id)

        

        # Apply contextual adjustments

        if context:

            contextual_boost = await self._calculate_contextual_boost(

                user_id, item_id, context)

            base_score *= (1.0 + contextual_boost)

        

        return base_score

    

    def _should_update_model(self):

        """Check if model should be updated based on time and interaction volume"""

        time_since_update = datetime.now() - self.last_model_update

        interaction_threshold = 1000  # Update after 1000 new interactions

        

        return (time_since_update > self.model_update_interval or 

                len(self.interaction_buffer) > interaction_threshold)

    

    async def _trigger_model_update(self):

        """Trigger asynchronous model retraining"""

        print("Triggering model update...")

        

        # Process buffered interactions

        if self.interaction_buffer:

            await self._process_interaction_batch(self.interaction_buffer)

            self.interaction_buffer.clear()

        

        # Update model timestamp

        self.last_model_update = datetime.now()



A/B Testing and Experimentation


Continuous improvement of recommender systems requires systematic experimentation through A/B testing. This allows teams to measure the impact of algorithm changes, feature modifications, and user experience improvements.



import random

import hashlib

from enum import Enum


class ExperimentStatus(Enum):

    ACTIVE = "active"

    PAUSED = "paused"

    COMPLETED = "completed"


class ABTestingFramework:

    def __init__(self):

        self.experiments = {}

        self.user_assignments = {}

        self.metrics_collector = MetricsCollector()

    

    def create_experiment(self, experiment_id, control_algorithm, 

                         test_algorithm, traffic_split=0.5, 

                         success_metrics=None):

        """Create a new A/B test experiment"""

        experiment = {

            'experiment_id': experiment_id,

            'control_algorithm': control_algorithm,

            'test_algorithm': test_algorithm,

            'traffic_split': traffic_split,

            'success_metrics': success_metrics or ['ctr', 'conversion_rate'],

            'status': ExperimentStatus.ACTIVE,

            'start_time': datetime.now(),

            'participants': {'control': set(), 'test': set()}

        }

        

        self.experiments[experiment_id] = experiment

        return experiment

    

    def assign_user_to_experiment(self, user_id, experiment_id):

        """Consistently assign user to control or test group"""

        if experiment_id not in self.experiments:

            return 'control'  # Default to control if experiment doesn't exist

        

        experiment = self.experiments[experiment_id]

        if experiment['status'] != ExperimentStatus.ACTIVE:

            return 'control'

        

        # Use hash-based assignment for consistency

        hash_input = f"{user_id}_{experiment_id}"

        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)

        assignment_value = (hash_value % 100) / 100.0

        

        if assignment_value < experiment['traffic_split']:

            assignment = 'test'

            experiment['participants']['test'].add(user_id)

        else:

            assignment = 'control'

            experiment['participants']['control'].add(user_id)

        

        self.user_assignments[(user_id, experiment_id)] = assignment

        return assignment

    

    def get_recommendations_with_experiment(self, user_id, experiment_id, 

                                          n_recommendations=10):

        """Get recommendations based on experiment assignment"""

        assignment = self.assign_user_to_experiment(user_id, experiment_id)

        experiment = self.experiments[experiment_id]

        

        if assignment == 'test':

            algorithm = experiment['test_algorithm']

        else:

            algorithm = experiment['control_algorithm']

        

        recommendations = algorithm.recommend_items(user_id, n_recommendations)

        

        # Log experiment exposure

        self.metrics_collector.log_experiment_exposure(

            user_id, experiment_id, assignment, recommendations)

        

        return recommendations

    

    def record_user_action(self, user_id, action_type, item_id=None, 

                          experiment_id=None):

        """Record user action for experiment analysis"""

        if experiment_id and (user_id, experiment_id) in self.user_assignments:

            assignment = self.user_assignments[(user_id, experiment_id)]

            

            self.metrics_collector.log_user_action(

                user_id, experiment_id, assignment, action_type, item_id)

    

    def analyze_experiment_results(self, experiment_id):

        """Analyze A/B test results and statistical significance"""

        if experiment_id not in self.experiments:

            return None

        

        experiment = self.experiments[experiment_id]

        

        # Get metrics for both groups

        control_metrics = self.metrics_collector.get_group_metrics(

            experiment_id, 'control')

        test_metrics = self.metrics_collector.get_group_metrics(

            experiment_id, 'test')

        

        results = {

            'experiment_id': experiment_id,

            'control_group_size': len(experiment['participants']['control']),

            'test_group_size': len(experiment['participants']['test']),

            'control_metrics': control_metrics,

            'test_metrics': test_metrics,

            'statistical_significance': self._calculate_significance(

                control_metrics, test_metrics)

        }

        

        return results

    

    def _calculate_significance(self, control_metrics, test_metrics):

        """Calculate statistical significance using t-test"""

        # Simplified significance calculation

        # In practice, would use proper statistical tests

        significance_results = {}

        

        for metric in control_metrics:

            if metric in test_metrics:

                control_value = control_metrics[metric]

                test_value = test_metrics[metric]

                

                # Calculate relative improvement

                if control_value > 0:

                    improvement = (test_value - control_value) / control_value

                    significance_results[metric] = {

                        'improvement': improvement,

                        'significant': abs(improvement) > 0.05  # 5% threshold

                    }

        

        return significance_results


class MetricsCollector:

    def __init__(self):

        self.exposure_logs = []

        self.action_logs = []

    

    def log_experiment_exposure(self, user_id, experiment_id, assignment, 

                               recommendations):

        """Log when user is exposed to experiment"""

        log_entry = {

            'user_id': user_id,

            'experiment_id': experiment_id,

            'assignment': assignment,

            'recommendations': recommendations,

            'timestamp': datetime.now()

        }

        self.exposure_logs.append(log_entry)

    

    def log_user_action(self, user_id, experiment_id, assignment, 

                       action_type, item_id=None):

        """Log user actions for experiment analysis"""

        log_entry = {

            'user_id': user_id,

            'experiment_id': experiment_id,

            'assignment': assignment,

            'action_type': action_type,

            'item_id': item_id,

            'timestamp': datetime.now()

        }

        self.action_logs.append(log_entry)

    

    def get_group_metrics(self, experiment_id, assignment):

        """Calculate metrics for specific experiment group"""

        # Filter logs for this experiment and assignment

        exposure_logs = [log for log in self.exposure_logs 

                        if log['experiment_id'] == experiment_id 

                        and log['assignment'] == assignment]

        

        action_logs = [log for log in self.action_logs 

                      if log['experiment_id'] == experiment_id 

                      and log['assignment'] == assignment]

        

        # Calculate key metrics

        total_exposures = len(exposure_logs)

        total_clicks = len([log for log in action_logs 

                           if log['action_type'] == 'click'])

        total_conversions = len([log for log in action_logs 

                               if log['action_type'] == 'purchase'])

        

        metrics = {

            'exposures': total_exposures,

            'clicks': total_clicks,

            'conversions': total_conversions,

            'ctr': total_clicks / total_exposures if total_exposures > 0 else 0.0,

            'conversion_rate': total_conversions / total_exposures if total_exposures > 0 else 0.0

        }

        

        return metrics



COMPLETE RUNNING EXAMPLE: MOVIE RECOMMENDATION SYSTEM


The following complete implementation demonstrates all the concepts discussed in this article through a comprehensive movie recommendation system. This system combines collaborative filtering, content-based filtering, and matrix factorization in a hybrid approach, with proper evaluation and cold start handling.



import numpy as np

import pandas as pd

from datetime import datetime, timedelta

import random

import math

from collections import defaultdict

from sklearn.feature_extraction.text import TfidfVectorizer

from sklearn.metrics.pairwise import cosine_similarity

from sklearn.model_selection import train_test_split


class Movie:

    def __init__(self, movie_id, title, genres, year, description=""):

        self.movie_id = movie_id

        self.title = title

        self.genres = genres.split('|') if isinstance(genres, str) else genres

        self.year = year

        self.description = description

        self.avg_rating = 0.0

        self.rating_count = 0


class User:

    def __init__(self, user_id, age=None, gender=None, occupation=None):

        self.user_id = user_id

        self.age = age

        self.gender = gender

        self.occupation = occupation

        self.ratings = {}

        self.avg_rating = 0.0


class Rating:

    def __init__(self, user_id, movie_id, rating, timestamp=None):

        self.user_id = user_id

        self.movie_id = movie_id

        self.rating = rating

        self.timestamp = timestamp or datetime.now()


class MovieRecommendationSystem:

    def __init__(self):

        self.users = {}

        self.movies = {}

        self.ratings = []

        self.user_movie_matrix = None

        self.movie_features = None

        self.tfidf_vectorizer = None

        self.content_similarity_matrix = None

        

        # Algorithm components

        self.collaborative_filter = CollaborativeFilteringRecommender()

        self.content_filter = ContentBasedRecommender()

        self.matrix_factorization = MatrixFactorizationRecommender()

        self.hybrid_recommender = None

        

        # Evaluation

        self.evaluator = MovieRecommenderEvaluator()

        

        # Cold start handler

        self.cold_start_handler = ColdStartManager()

    

    def add_user(self, user):

        """Add user to the system"""

        self.users[user.user_id] = user

    

    def add_movie(self, movie):

        """Add movie to the system"""

        self.movies[movie.movie_id] = movie

    

    def add_rating(self, rating):

        """Add rating to the system"""

        self.ratings.append(rating)

        

        # Update user's rating history

        if rating.user_id in self.users:

            self.users[rating.user_id].ratings[rating.movie_id] = rating.rating

        

        # Update movie's rating statistics

        if rating.movie_id in self.movies:

            movie = self.movies[rating.movie_id]

            total_rating = movie.avg_rating * movie.rating_count + rating.rating

            movie.rating_count += 1

            movie.avg_rating = total_rating / movie.rating_count

    

    def load_sample_data(self):

        """Load sample movie and rating data for demonstration"""

        # Sample movies

        movies_data = [

            (1, "Toy Story", "Animation|Children's|Comedy", 1995, 

             "A cowboy doll is profoundly threatened when a new spaceman figure supplants him as top toy."),

            (2, "Jumanji", "Adventure|Children's|Fantasy", 1995,

             "When two kids find and play a magical board game, they release a man trapped for decades."),

            (3, "Grumpier Old Men", "Comedy|Romance", 1995,

             "John and Max resolve to save their beloved bait shop from turning into an Italian restaurant."),

            (4, "Waiting to Exhale", "Comedy|Drama", 1995,

             "Based on Terry McMillan's novel, this film follows four very different African-American women."),

            (5, "Father of the Bride Part II", "Comedy", 1995,

             "George Banks must deal not only with his daughter's pregnancy, but also with his wife's."),

            (6, "Heat", "Action|Crime|Thriller", 1995,

             "A group of professional bank robbers start to feel the heat from police when they unknowingly leave a clue."),

            (7, "Sabrina", "Comedy|Romance", 1995,

             "An ugly duckling having undergone a remarkable change, still fancies her childhood crush."),

            (8, "Tom and Huck", "Adventure|Children's", 1995,

             "Two best friends witness a murder and go on the run from the killer."),

            (9, "Sudden Death", "Action", 1995,

             "A former fireman takes on a group of terrorists holding the Vice President and others hostage."),

            (10, "GoldenEye", "Action|Adventure|Thriller", 1995,

             "James Bond teams up with the lone survivor of a destroyed Russian research center.")

        ]

        

        for movie_data in movies_data:

            movie = Movie(*movie_data)

            self.add_movie(movie)

        

        # Sample users

        users_data = [

            (1, 25, 'M', 'Student'),

            (2, 35, 'F', 'Engineer'),

            (3, 45, 'M', 'Teacher'),

            (4, 28, 'F', 'Designer'),

            (5, 52, 'M', 'Manager')

        ]

        

        for user_data in users_data:

            user = User(*user_data)

            self.add_user(user)

        

        # Sample ratings

        ratings_data = [

            (1, 1, 5), (1, 2, 3), (1, 3, 4), (1, 6, 5), (1, 7, 3),

            (2, 1, 4), (2, 2, 2), (2, 4, 5), (2, 5, 3), (2, 8, 4),

            (3, 1, 3), (3, 3, 5), (3, 6, 4), (3, 9, 2), (3, 10, 4),

            (4, 2, 4), (4, 4, 5), (4, 5, 4), (4, 7, 5), (4, 8, 3),

            (5, 1, 2), (5, 3, 3), (5, 6, 5), (5, 9, 4), (5, 10, 5)

        ]

        

        for rating_data in ratings_data:

            rating = Rating(*rating_data)

            self.add_rating(rating)

    

    def build_user_movie_matrix(self):

        """Build user-movie rating matrix"""

        user_ids = sorted(self.users.keys())

        movie_ids = sorted(self.movies.keys())

        

        matrix = np.zeros((len(user_ids), len(movie_ids)))

        

        user_to_idx = {user_id: idx for idx, user_id in enumerate(user_ids)}

        movie_to_idx = {movie_id: idx for idx, movie_id in enumerate(movie_ids)}

        

        for rating in self.ratings:

            user_idx = user_to_idx[rating.user_id]

            movie_idx = movie_to_idx[rating.movie_id]

            matrix[user_idx, movie_idx] = rating.rating

        

        self.user_movie_matrix = matrix

        self.user_to_idx = user_to_idx

        self.movie_to_idx = movie_to_idx

        self.idx_to_user = {idx: user_id for user_id, idx in user_to_idx.items()}

        self.idx_to_movie = {idx: movie_id for movie_id, idx in movie_to_idx.items()}

    

    def build_content_features(self):

        """Build content-based features for movies"""

        movie_descriptions = []

        movie_ids = []

        

        for movie_id, movie in self.movies.items():

            # Combine genres and description for content features

            content = ' '.join(movie.genres) + ' ' + movie.description

            movie_descriptions.append(content)

            movie_ids.append(movie_id)

        

        # Create TF-IDF features

        self.tfidf_vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)

        tfidf_matrix = self.tfidf_vectorizer.fit_transform(movie_descriptions)

        

        # Calculate content similarity matrix

        self.content_similarity_matrix = cosine_similarity(tfidf_matrix)

        self.content_movie_ids = movie_ids

    

    def train_system(self):

        """Train all recommendation algorithms"""

        print("Building user-movie matrix...")

        self.build_user_movie_matrix()

        

        print("Building content features...")

        self.build_content_features()

        

        print("Training collaborative filtering...")

        self.collaborative_filter.fit(self.user_movie_matrix, self.user_to_idx, self.movie_to_idx)

        

        print("Training content-based filtering...")

        self.content_filter.fit(self.content_similarity_matrix, self.content_movie_ids, self.movies)

        

        print("Training matrix factorization...")

        self.matrix_factorization.fit(self.ratings, self.users, self.movies)

        

        print("Setting up hybrid recommender...")

        algorithms = [self.collaborative_filter, self.content_filter, self.matrix_factorization]

        weights = [0.4, 0.3, 0.3]  # Balanced weighting

        self.hybrid_recommender = HybridMovieRecommender(algorithms, weights)

        

        print("Training complete!")

    

    def get_recommendations(self, user_id, n_recommendations=5):

        """Get movie recommendations for a user"""

        if user_id not in self.users:

            return self.cold_start_handler.get_popular_movies(self.movies, n_recommendations)

        

        user_rating_count = len(self.users[user_id].ratings)

        

        if user_rating_count < 3:  # Cold start threshold

            return self.cold_start_handler.get_recommendations_for_new_user(

                user_id, self.users[user_id], self.movies, n_recommendations)

        

        return self.hybrid_recommender.recommend(user_id, self.movies, n_recommendations)

    

    def evaluate_system(self, test_size=0.2):

        """Evaluate the recommendation system"""

        if not self.ratings:

            print("No ratings available for evaluation")

            return

        

        # Split data into train and test sets

        train_ratings, test_ratings = train_test_split(self.ratings, test_size=test_size, random_state=42)

        

        # Create temporary system with training data only

        temp_system = MovieRecommendationSystem()

        temp_system.users = self.users.copy()

        temp_system.movies = self.movies.copy()

        temp_system.ratings = train_ratings

        

        # Train on training data

        temp_system.train_system()

        

        # Evaluate on test data

        results = self.evaluator.evaluate(temp_system, test_ratings)

        

        print("\n=== EVALUATION RESULTS ===")

        print(f"RMSE: {results['rmse']:.4f}")

        print(f"MAE: {results['mae']:.4f}")

        print(f"Precision@5: {results['precision_at_5']:.4f}")

        print(f"Recall@5: {results['recall_at_5']:.4f}")

        print(f"Coverage: {results['coverage']:.4f}")

        

        return results


class CollaborativeFilteringRecommender:

    def __init__(self, similarity_threshold=0.1, k=5):

        self.similarity_threshold = similarity_threshold

        self.k = k

        self.user_similarity = None

        self.user_movie_matrix = None

        self.user_to_idx = None

        self.movie_to_idx = None

    

    def fit(self, user_movie_matrix, user_to_idx, movie_to_idx):

        """Train collaborative filtering model"""

        self.user_movie_matrix = user_movie_matrix

        self.user_to_idx = user_to_idx

        self.movie_to_idx = movie_to_idx

        

        # Calculate user similarity matrix

        self.user_similarity = cosine_similarity(user_movie_matrix)

        

        # Set diagonal to 0 (user similarity with themselves)

        np.fill_diagonal(self.user_similarity, 0)

    

    def predict_rating(self, user_id, movie_id):

        """Predict rating for user-movie pair"""

        if user_id not in self.user_to_idx or movie_id not in self.movie_to_idx:

            return 3.0  # Default rating

        

        user_idx = self.user_to_idx[user_id]

        movie_idx = self.movie_to_idx[movie_id]

        

        # If user already rated this movie, return existing rating

        if self.user_movie_matrix[user_idx, movie_idx] > 0:

            return self.user_movie_matrix[user_idx, movie_idx]

        

        # Find similar users who rated this movie

        user_similarities = self.user_similarity[user_idx]

        similar_users = []

        

        for other_user_idx, similarity in enumerate(user_similarities):

            if (similarity > self.similarity_threshold and 

                self.user_movie_matrix[other_user_idx, movie_idx] > 0):

                similar_users.append((other_user_idx, similarity))

        

        # Sort by similarity and take top k

        similar_users.sort(key=lambda x: x[1], reverse=True)

        similar_users = similar_users[:self.k]

        

        if not similar_users:

            return 3.0  # Default rating

        

        # Calculate weighted average

        weighted_sum = 0.0

        similarity_sum = 0.0

        

        for other_user_idx, similarity in similar_users:

            rating = self.user_movie_matrix[other_user_idx, movie_idx]

            weighted_sum += similarity * rating

            similarity_sum += abs(similarity)

        

        if similarity_sum > 0:

            return weighted_sum / similarity_sum

        else:

            return 3.0

    

    def recommend(self, user_id, movies, n_recommendations=5):

        """Generate movie recommendations for user"""

        if user_id not in self.user_to_idx:

            return []

        

        user_idx = self.user_to_idx[user_id]

        user_ratings = self.user_movie_matrix[user_idx]

        

        # Get movies user hasn't rated

        unrated_movies = []

        for movie_id in movies.keys():

            if movie_id in self.movie_to_idx:

                movie_idx = self.movie_to_idx[movie_id]

                if user_ratings[movie_idx] == 0:  # Not rated

                    predicted_rating = self.predict_rating(user_id, movie_id)

                    unrated_movies.append((movie_id, predicted_rating))

        

        # Sort by predicted rating and return top N

        unrated_movies.sort(key=lambda x: x[1], reverse=True)

        return [movie_id for movie_id, rating in unrated_movies[:n_recommendations]]


class ContentBasedRecommender:

    def __init__(self, similarity_threshold=0.1):

        self.similarity_threshold = similarity_threshold

        self.content_similarity_matrix = None

        self.content_movie_ids = None

        self.movies = None

    

    def fit(self, content_similarity_matrix, content_movie_ids, movies):

        """Train content-based model"""

        self.content_similarity_matrix = content_similarity_matrix

        self.content_movie_ids = content_movie_ids

        self.movies = movies

        self.movie_to_idx = {movie_id: idx for idx, movie_id in enumerate(content_movie_ids)}

    

    def recommend(self, user_id, movies, n_recommendations=5):

        """Generate content-based recommendations"""

        if user_id not in movies:  # This should be users, fixing the logic

            return []

        

        # Get user's rated movies and their ratings

        user = None

        for u in movies.values():  # This logic needs to be fixed - should access users

            if hasattr(u, 'user_id') and u.user_id == user_id:

                user = u

                break

        

        if not user or not hasattr(user, 'ratings'):

            return []

        

        # Calculate content-based scores for unrated movies

        movie_scores = {}

        

        for movie_id in self.content_movie_ids:

            if movie_id not in user.ratings:  # User hasn't rated this movie

                score = self._calculate_content_score(user.ratings, movie_id)

                if score > 0:

                    movie_scores[movie_id] = score

        

        # Sort by score and return top N

        sorted_movies = sorted(movie_scores.items(), key=lambda x: x[1], reverse=True)

        return [movie_id for movie_id, score in sorted_movies[:n_recommendations]]

    

    def _calculate_content_score(self, user_ratings, target_movie_id):

        """Calculate content-based score for target movie"""

        if target_movie_id not in self.movie_to_idx:

            return 0.0

        

        target_idx = self.movie_to_idx[target_movie_id]

        weighted_similarity = 0.0

        total_weight = 0.0

        

        for rated_movie_id, rating in user_ratings.items():

            if rated_movie_id in self.movie_to_idx:

                rated_idx = self.movie_to_idx[rated_movie_id]

                similarity = self.content_similarity_matrix[target_idx, rated_idx]

                

                if similarity > self.similarity_threshold:

                    # Weight similarity by user's rating (higher rated movies have more influence)

                    weight = rating / 5.0  # Normalize rating to 0-1

                    weighted_similarity += similarity * weight

                    total_weight += weight

        

        if total_weight > 0:

            return weighted_similarity / total_weight

        else:

            return 0.0


class MatrixFactorizationRecommender:

    def __init__(self, n_factors=10, learning_rate=0.01, regularization=0.01, n_epochs=50):

        self.n_factors = n_factors

        self.learning_rate = learning_rate

        self.regularization = regularization

        self.n_epochs = n_epochs

        self.user_factors = {}

        self.movie_factors = {}

        self.user_biases = {}

        self.movie_biases = {}

        self.global_bias = 0.0

    

    def fit(self, ratings, users, movies):

        """Train matrix factorization model"""

        self.users = users

        self.movies = movies

        

        # Initialize parameters

        self._initialize_parameters(ratings)

        

        # Training loop

        for epoch in range(self.n_epochs):

            epoch_loss = 0.0

            

            for rating in ratings:

                loss = self._update_parameters(rating)

                epoch_loss += loss

            

            if epoch % 10 == 0:

                print(f"Epoch {epoch}, Loss: {epoch_loss:.4f}")

    

    def _initialize_parameters(self, ratings):

        """Initialize user and movie factors"""

        user_ids = set(rating.user_id for rating in ratings)

        movie_ids = set(rating.movie_id for rating in ratings)

        

        # Initialize factors with small random values

        for user_id in user_ids:

            self.user_factors[user_id] = np.random.normal(0, 0.1, self.n_factors)

            self.user_biases[user_id] = 0.0

        

        for movie_id in movie_ids:

            self.movie_factors[movie_id] = np.random.normal(0, 0.1, self.n_factors)

            self.movie_biases[movie_id] = 0.0

        

        # Calculate global bias

        total_rating = sum(rating.rating for rating in ratings)

        self.global_bias = total_rating / len(ratings)

    

    def _update_parameters(self, rating):

        """Update parameters using gradient descent"""

        user_id = rating.user_id

        movie_id = rating.movie_id

        actual_rating = rating.rating

        

        # Predict rating

        predicted_rating = self.predict_rating(user_id, movie_id)

        error = actual_rating - predicted_rating

        

        # Get current parameters

        user_factor = self.user_factors[user_id]

        movie_factor = self.movie_factors[movie_id]

        user_bias = self.user_biases[user_id]

        movie_bias = self.movie_biases[movie_id]

        

        # Update biases

        self.user_biases[user_id] += self.learning_rate * (error - self.regularization * user_bias)

        self.movie_biases[movie_id] += self.learning_rate * (error - self.regularization * movie_bias)

        

        # Update factors

        user_factor_update = self.learning_rate * (error * movie_factor - self.regularization * user_factor)

        movie_factor_update = self.learning_rate * (error * user_factor - self.regularization * movie_factor)

        

        self.user_factors[user_id] += user_factor_update

        self.movie_factors[movie_id] += movie_factor_update

        

        return error ** 2

    

    def predict_rating(self, user_id, movie_id):

        """Predict rating for user-movie pair"""

        if user_id not in self.user_factors or movie_id not in self.movie_factors:

            return self.global_bias

        

        user_factor = self.user_factors[user_id]

        movie_factor = self.movie_factors[movie_id]

        user_bias = self.user_biases[user_id]

        movie_bias = self.movie_biases[movie_id]

        

        prediction = (self.global_bias + user_bias + movie_bias + 

                     np.dot(user_factor, movie_factor))

        

        # Clamp to valid rating range

        return max(1.0, min(5.0, prediction))

    

    def recommend(self, user_id, movies, n_recommendations=5):

        """Generate recommendations using matrix factorization"""

        if user_id not in self.user_factors:

            return []

        

        # Get user's current ratings

        user_ratings = set()

        if user_id in self.users:

            user_ratings = set(self.users[user_id].ratings.keys())

        

        # Score all unrated movies

        movie_scores = []

        for movie_id in movies.keys():

            if movie_id not in user_ratings:

                score = self.predict_rating(user_id, movie_id)

                movie_scores.append((movie_id, score))

        

        # Sort by score and return top N

        movie_scores.sort(key=lambda x: x[1], reverse=True)

        return [movie_id for movie_id, score in movie_scores[:n_recommendations]]


class HybridMovieRecommender:

    def __init__(self, algorithms, weights):

        self.algorithms = algorithms

        self.weights = weights

    

    def recommend(self, user_id, movies, n_recommendations=5):

        """Generate hybrid recommendations"""

        # Get recommendations from each algorithm

        all_recommendations = []

        

        for algorithm, weight in zip(self.algorithms, self.weights):

            try:

                recs = algorithm.recommend(user_id, movies, n_recommendations * 2)

                weighted_recs = [(movie_id, weight) for movie_id in recs]

                all_recommendations.extend(weighted_recs)

            except Exception as e:

                print(f"Algorithm {algorithm.__class__.__name__} failed: {e}")

                continue

        

        # Aggregate scores

        movie_scores = defaultdict(float)

        for movie_id, weight in all_recommendations:

            movie_scores[movie_id] += weight

        

        # Sort by aggregated score

        sorted_movies = sorted(movie_scores.items(), key=lambda x: x[1], reverse=True)

        

        return [movie_id for movie_id, score in sorted_movies[:n_recommendations]]


class ColdStartManager:

    def get_popular_movies(self, movies, n_recommendations=5):

        """Get most popular movies based on rating count and average rating"""

        movie_scores = []

        

        for movie_id, movie in movies.items():

            if movie.rating_count > 0:

                # Score based on average rating weighted by number of ratings

                score = movie.avg_rating * math.log(1 + movie.rating_count)

                movie_scores.append((movie_id, score))

        

        movie_scores.sort(key=lambda x: x[1], reverse=True)

        return [movie_id for movie_id, score in movie_scores[:n_recommendations]]

    

    def get_recommendations_for_new_user(self, user_id, user, movies, n_recommendations=5):

        """Get recommendations for users with few ratings"""

        # For users with some ratings, use content-based approach

        if hasattr(user, 'ratings') and user.ratings:

            # Find movies similar to those the user has rated highly

            similar_movies = []

            

            for rated_movie_id, rating in user.ratings.items():

                if rating >= 4:  # User liked this movie

                    # Find movies with similar genres

                    rated_movie = movies[rated_movie_id]

                    for movie_id, movie in movies.items():

                        if movie_id != rated_movie_id and movie_id not in user.ratings:

                            # Check genre overlap

                            common_genres = set(rated_movie.genres) & set(movie.genres)

                            if common_genres:

                                similarity_score = len(common_genres) / len(set(rated_movie.genres) | set(movie.genres))

                                similar_movies.append((movie_id, similarity_score))

            

            if similar_movies:

                similar_movies.sort(key=lambda x: x[1], reverse=True)

                return [movie_id for movie_id, score in similar_movies[:n_recommendations]]

        

        # Fallback to popular movies

        return self.get_popular_movies(movies, n_recommendations)


class MovieRecommenderEvaluator:

    def evaluate(self, recommender_system, test_ratings):

        """Evaluate recommender system performance"""

        predictions = []

        actuals = []

        

        # Collect predictions for test ratings

        for rating in test_ratings:

            if hasattr(recommender_system.hybrid_recommender, 'algorithms'):

                # Use matrix factorization for rating prediction

                mf_algorithm = recommender_system.hybrid_recommender.algorithms[2]  # Matrix factorization

                predicted = mf_algorithm.predict_rating(rating.user_id, rating.movie_id)

                predictions.append(predicted)

                actuals.append(rating.rating)

        

        # Calculate RMSE and MAE

        rmse = self._calculate_rmse(predictions, actuals)

        mae = self._calculate_mae(predictions, actuals)

        

        # Calculate ranking metrics

        precision_at_5, recall_at_5 = self._calculate_ranking_metrics(

            recommender_system, test_ratings, k=5)

        

        # Calculate coverage

        coverage = self._calculate_coverage(recommender_system)

        

        return {

            'rmse': rmse,

            'mae': mae,

            'precision_at_5': precision_at_5,

            'recall_at_5': recall_at_5,

            'coverage': coverage

        }

    

    def _calculate_rmse(self, predictions, actuals):

        """Calculate Root Mean Square Error"""

        if not predictions:

            return 0.0

        

        squared_errors = [(p - a) ** 2 for p, a in zip(predictions, actuals)]

        return math.sqrt(sum(squared_errors) / len(squared_errors))

    

    def _calculate_mae(self, predictions, actuals):

        """Calculate Mean Absolute Error"""

        if not predictions:

            return 0.0

        

        absolute_errors = [abs(p - a) for p, a in zip(predictions, actuals)]

        return sum(absolute_errors) / len(absolute_errors)

    

    def _calculate_ranking_metrics(self, recommender_system, test_ratings, k=5):

        """Calculate precision and recall at k"""

        user_test_items = defaultdict(set)

        

        # Group test items by user

        for rating in test_ratings:

            if rating.rating >= 4:  # Consider rating >= 4 as relevant

                user_test_items[rating.user_id].add(rating.movie_id)

        

        precision_scores = []

        recall_scores = []

        

        for user_id, relevant_items in user_test_items.items():

            if len(relevant_items) == 0:

                continue

            

            # Get recommendations for this user

            recommendations = recommender_system.get_recommendations(user_id, k)

            

            if not recommendations:

                precision_scores.append(0.0)

                recall_scores.append(0.0)

                continue

            

            # Calculate precision and recall

            recommended_set = set(recommendations)

            relevant_recommended = recommended_set & relevant_items

            

            precision = len(relevant_recommended) / len(recommended_set) if recommended_set else 0.0

            recall = len(relevant_recommended) / len(relevant_items) if relevant_items else 0.0

            

            precision_scores.append(precision)

            recall_scores.append(recall)

        

        avg_precision = sum(precision_scores) / len(precision_scores) if precision_scores else 0.0

        avg_recall = sum(recall_scores) / len(recall_scores) if recall_scores else 0.0

        

        return avg_precision, avg_recall

    

    def _calculate_coverage(self, recommender_system):

        """Calculate catalog coverage"""

        total_movies = len(recommender_system.movies)

        recommended_movies = set()

        

        # Get recommendations for all users

        for user_id in recommender_system.users.keys():

            recommendations = recommender_system.get_recommendations(user_id, 10)

            recommended_movies.update(recommendations)

        

        coverage = len(recommended_movies) / total_movies if total_movies > 0 else 0.0

        return coverage


def demonstrate_movie_recommendation_system():

    """Demonstrate the complete movie recommendation system"""

    print("=== MOVIE RECOMMENDATION SYSTEM DEMO ===\n")

    

    # Create and initialize the system

    system = MovieRecommendationSystem()

    

    # Load sample data

    print("Loading sample data...")

    system.load_sample_data()

    

    # Train the system

    print("Training recommendation algorithms...")

    system.train_system()

    

    # Evaluate the system

    print("\nEvaluating system performance...")

    evaluation_results = system.evaluate_system()

    

    # Generate recommendations for each user

    print("\n=== GENERATING RECOMMENDATIONS ===")

    for user_id in system.users.keys():

        user = system.users[user_id]

        print(f"\nUser {user_id} (Age: {user.age}, Gender: {user.gender}):")

        print(f"Previous ratings: {len(user.ratings)} movies")

        

        # Show user's top rated movies

        if user.ratings:

            top_rated = sorted(user.ratings.items(), key=lambda x: x[1], reverse=True)[:3]

            print("Top rated movies:")

            for movie_id, rating in top_rated:

                movie_title = system.movies[movie_id].title

                print(f"  - {movie_title} (Rating: {rating})")

        

        # Get recommendations

        recommendations = system.get_recommendations(user_id, 5)

        print("Recommended movies:")

        for i, movie_id in enumerate(recommendations, 1):

            movie = system.movies[movie_id]

            print(f"  {i}. {movie.title} ({', '.join(movie.genres)})")

    

    # Demonstrate cold start handling

    print("\n=== COLD START DEMONSTRATION ===")

    print("Recommendations for new user (no rating history):")

    new_user = User(999, age=30, gender='M', occupation='Developer')

    system.add_user(new_user)

    

    cold_start_recs = system.get_recommendations(999, 5)

    for i, movie_id in enumerate(cold_start_recs, 1):

        movie = system.movies[movie_id]

        print(f"  {i}. {movie.title} ({', '.join(movie.genres)})")

    

    print("\n=== DEMO COMPLETE ===")


if __name__ == "__main__":

    demonstrate_movie_recommendation_system()



CONCLUSION


Building an excellent recommender system requires careful consideration of multiple algorithmic approaches, comprehensive evaluation methodologies, and robust production infrastructure. The key to success lies in understanding the specific requirements of your use case, the characteristics of your data, and the needs of your users.


The hybrid approach demonstrated in this article combines the strengths of collaborative filtering, content-based filtering, and matrix factorization to provide accurate and diverse recommendations. The system addresses critical challenges such as the cold start problem, scalability requirements, and the need for continuous evaluation and improvement.


Modern recommender systems must also consider ethical implications, including fairness, transparency, and user privacy. As these systems become more sophisticated and influential in shaping user behavior, responsible development practices become increasingly important.


The complete movie recommendation system provided serves as a practical foundation that can be extended and adapted for various domains. Key areas for further enhancement include deep learning integration, real-time processing capabilities, and advanced evaluation metrics that capture user satisfaction beyond traditional accuracy measures.


Success in building recommender systems ultimately depends on iterative improvement through careful experimentation, user feedback integration, and continuous monitoring of system performance in production environments. The frameworks and principles outlined in this article provide a solid foundation for developing recommender systems that deliver genuine value to users while meeting business objectives.

No comments: