Skip to content

Feature/pytorch embedding ranker #2220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great to have the notebook accompanying this code

70 changes: 69 additions & 1 deletion recommenders/evaluation/spark_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import numpy as np
import warnings # Added for R-Precision warning

try:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
Expand Down Expand Up @@ -213,6 +214,7 @@ def __init__(
self.col_rating = col_rating
self.col_prediction = col_prediction
self.threshold = threshold
self.rating_pred_raw = rating_pred # Store raw predictions before processing

# Check if inputs are Spark DataFrames.
if not isinstance(self.rating_true, DataFrame):
Expand Down Expand Up @@ -356,6 +358,72 @@ def map_at_k(self):
"""
return self._metrics.meanAveragePrecisionAt(self.k)

def r_precision(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please separate this into the other PR #2219

"""Calculate R-Precision.

R-Precision is the fraction of the top R recommended items that are relevant,
where R is the total number of relevant items for the user.

Returns:
float: Mean R-Precision across all users.
"""
# Assume rating_true contains only relevant items (e.g., positive interactions)
ground_truth_items = self.rating_true.select(self.col_user, self.col_item)

# Calculate R: number of relevant items per user
ground_truth_with_R = ground_truth_items.groupBy(self.col_user).agg(
F.collect_list(self.col_item).alias("ground_truth"),
F.count(self.col_item).alias("R")
)

# Filter out users with no relevant items (R=0)
ground_truth_with_R = ground_truth_with_R.filter(F.col("R") > 0)
if ground_truth_with_R.count() == 0:
warnings.warn("No users with relevant items found (R > 0). R-Precision is 0.")
return 0.0


# Rank predictions per user
window_spec = Window.partitionBy(self.col_user).orderBy(F.col(self.col_prediction).desc())
# Use rating_pred_raw which has user, item, prediction score
ranked_preds = self.rating_pred_raw.select(
self.col_user, self.col_item, self.col_prediction
).withColumn("rank", F.row_number().over(window_spec))

# Join ranked predictions with R
preds_with_r = ranked_preds.join(
ground_truth_with_R.select(self.col_user, "R"), on=self.col_user
)

# Filter for top R predictions
top_r_preds = preds_with_r.filter(F.col("rank") <= F.col("R"))

# Check which top R predictions are in the ground truth relevant items
# Create a dataframe of relevant items for easy joining
relevant_items_flagged = ground_truth_items.withColumn("is_relevant", F.lit(1))

relevant_in_top_r = top_r_preds.join(
relevant_items_flagged.select(self.col_user, self.col_item, "is_relevant"),
[self.col_user, self.col_item],
"left"
).fillna(0, subset=["is_relevant"]) # Predictions not in ground truth get is_relevant = 0

# Calculate number of relevant items found in top R for each user
user_metrics = relevant_in_top_r.groupBy(self.col_user, "R").agg(
F.sum("is_relevant").alias("num_relevant_in_top_r")
)

# Calculate R-Precision per user
user_r_precision = user_metrics.withColumn(
"r_precision_user", F.col("num_relevant_in_top_r") / F.col("R")
)

# Calculate the average R-Precision across all users
# Ensure we only average over users who were in ground_truth_with_R (R>0)
avg_r_precision = user_r_precision.agg(F.mean("r_precision_user")).first()[0]

return avg_r_precision if avg_r_precision is not None else 0.0


def _get_top_k_items(
dataframe,
Expand Down Expand Up @@ -831,7 +899,7 @@ def user_item_serendipity(self):
Y.C. Zhang, D.Ó. Séaghdha, D. Quercia and T. Jambor, Auralist:
introducing serendipity into music recommendation, WSDM 2012

Eugene Yan, Serendipity: Accuracy’s unpopular best friend in Recommender Systems,
Eugene Yan, Serendipity's unpopular best friend in Recommender Systems,
eugeneyan.com, April 2020

Returns:
Expand Down
2 changes: 2 additions & 0 deletions recommenders/models/embedding_ranker/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.
208 changes: 208 additions & 0 deletions recommenders/models/embedding_ranker/embedding_ranker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# Copyright (c) Recommenders contributors.
# Licensed under the MIT License.

import os
import numpy as np
import pandas as pd
import torch

from recommenders.utils.constants import (
DEFAULT_USER_COL,
DEFAULT_ITEM_COL,
DEFAULT_RATING_COL,
DEFAULT_PREDICTION_COL,
DEFAULT_K,
)

def predict_rating(
model,
test_df,
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
col_rating=DEFAULT_RATING_COL,
col_prediction=DEFAULT_PREDICTION_COL,
batch_size=1024,
):
"""Predict ratings for user-item pairs in test data.

Args:
model (NNEmbeddingRanker): Trained embedding ranker model.
test_df (pandas.DataFrame): Test dataframe containing user-item pairs.
col_user (str): User column name.
col_item (str): Item column name.
col_rating (str): Rating column name.
col_prediction (str): Prediction column name.
batch_size (int): Batch size for predictions to avoid memory issues.

Returns:
pandas.DataFrame: Dataframe with user, item, prediction columns.
"""
# Create a copy of the test data with only the needed columns
test_copy = test_df[[col_user, col_item]].copy()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

every copy adds a lot of overhead, let's try to avoid it


# Process in batches to avoid memory issues
predictions = []
for i in range(0, len(test_copy), batch_size):
batch = test_copy.iloc[i:i+batch_size]
users = batch[col_user].values
items = batch[col_item].values
batch_predictions = model.predict(users, items, is_list=True)
predictions.extend(batch_predictions)

# Add predictions to the dataframe
test_copy[col_prediction] = predictions

return test_copy

def generate_recommendations(
model,
train_df,
test_df=None,
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
col_prediction=DEFAULT_PREDICTION_COL,
top_k=DEFAULT_K,
remove_seen=True,
batch_size=1024,
):
"""Generate top-k recommendations for all users or users in test data.

Args:
model (NNEmbeddingRanker): Trained embedding ranker model.
train_df (pandas.DataFrame): Training dataframe used to identify seen items if remove_seen=True.
test_df (pandas.DataFrame, optional): Test dataframe to identify users to generate recommendations for.
If None, recommendations are generated for all users in training.
col_user (str): User column name.
col_item (str): Item column name.
col_prediction (str): Prediction column name.
top_k (int): Number of top items to recommend.
remove_seen (bool): Whether to remove items that appear in the training data.
batch_size (int): Batch size for predictions to avoid memory issues.

Returns:
pandas.DataFrame: Dataframe with user, item, prediction columns for top-k items per user.
"""
# If test_df is provided, use users from test set
# Otherwise, use all users from training
if test_df is not None:
users = test_df[col_user].unique()
else:
users = train_df[col_user].unique()

# Get all items
all_items = list(model.item_id_map.keys())

# Filter users that are not in the training set
valid_users = [user for user in users if user in model.user_id_map]

if not valid_users:
raise ValueError("No valid users found in the dataset")

# Create combinations of users and items to predict
all_pairs = []
for user in valid_users:
for item in all_items:
all_pairs.append((user, item))

# Process in batches to avoid memory issues
all_predictions = []
for i in range(0, len(all_pairs), batch_size):
batch_pairs = all_pairs[i:i+batch_size]
batch_users = [pair[0] for pair in batch_pairs]
batch_items = [pair[1] for pair in batch_pairs]
batch_predictions = model.predict(batch_users, batch_items, is_list=True)

# Create batch results
for j, (user, item) in enumerate(batch_pairs):
all_predictions.append({
col_user: user,
col_item: item,
col_prediction: batch_predictions[j]
})

# Convert to dataframe
result_df = pd.DataFrame(all_predictions)

# If remove_seen is True, remove items that appear in the training data
if remove_seen:
# Create a set of user-item pairs from the training data
seen_pairs = set(zip(train_df[col_user], train_df[col_item]))

# Filter out seen pairs
result_df = result_df[~result_df.apply(lambda row: (row[col_user], row[col_item]) in seen_pairs, axis=1)]

# Get top-k recommendations for each user
top_k_df = (
result_df
.sort_values([col_user, col_prediction], ascending=[True, False])
.groupby(col_user).head(top_k)
.reset_index(drop=True)
)

return top_k_df

def evaluate_model(
model,
test_df,
metrics,
col_user=DEFAULT_USER_COL,
col_item=DEFAULT_ITEM_COL,
col_rating=DEFAULT_RATING_COL,
col_prediction=DEFAULT_PREDICTION_COL,
k=DEFAULT_K,
batch_size=1024,
):
"""Evaluate model using rating and ranking metrics.

Args:
model (NNEmbeddingRanker): Trained embedding ranker model.
test_df (pandas.DataFrame): Test dataframe.
metrics (dict): Dictionary of metric functions to use for evaluation.
Keys should be metric names and values should be functions that take
test_df, predictions_df, and other parameters.
col_user (str): User column name.
col_item (str): Item column name.
col_rating (str): Rating column name.
col_prediction (str): Prediction column name.
k (int): K value for ranking metrics.
batch_size (int): Batch size for predictions to avoid memory issues.

Returns:
dict: Dictionary with metric name as key and metric value as value.
"""
# Generate predictions for test data
predictions_df = predict_rating(
model,
test_df,
col_user=col_user,
col_item=col_item,
col_rating=col_rating,
col_prediction=col_prediction,
batch_size=batch_size,
)

# Calculate metrics
results = {}
for metric_name, metric_func in metrics.items():
# Different metrics may have different required parameters
if 'k' in metric_func.__code__.co_varnames:
results[metric_name] = metric_func(
test_df,
predictions_df,
col_user=col_user,
col_item=col_item,
col_rating=col_rating,
col_prediction=col_prediction,
k=k
)
else:
results[metric_name] = metric_func(
test_df,
predictions_df,
col_user=col_user,
col_item=col_item,
col_rating=col_rating,
col_prediction=col_prediction
)

return results
Loading