Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion recommenders/evaluation/spark_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import numpy as np
import warnings # Added for R-Precision warning

try:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
Expand Down Expand Up @@ -213,6 +214,7 @@ def __init__(
self.col_rating = col_rating
self.col_prediction = col_prediction
self.threshold = threshold
self.rating_pred_raw = rating_pred # Store raw predictions before processing
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.rating_pred_raw is only used in the R-Precision. Either use directly self.rating_pred or do the data treatment internally in the function


# Check if inputs are Spark DataFrames.
if not isinstance(self.rating_true, DataFrame):
Expand Down Expand Up @@ -356,6 +358,72 @@ def map_at_k(self):
"""
return self._metrics.meanAveragePrecisionAt(self.k)

def r_precision(self):
"""Calculate R-Precision.

R-Precision is the fraction of the top R recommended items that are relevant,
where R is the total number of relevant items for the user.

Returns:
float: Mean R-Precision across all users.
"""
# Assume rating_true contains only relevant items (e.g., positive interactions)
ground_truth_items = self.rating_true.select(self.col_user, self.col_item)

# Calculate R: number of relevant items per user
ground_truth_with_R = ground_truth_items.groupBy(self.col_user).agg(
F.collect_list(self.col_item).alias("ground_truth"),
F.count(self.col_item).alias("R")
)

# Filter out users with no relevant items (R=0)
ground_truth_with_R = ground_truth_with_R.filter(F.col("R") > 0)
if ground_truth_with_R.count() == 0:
warnings.warn("No users with relevant items found (R > 0). R-Precision is 0.")
return 0.0


# Rank predictions per user
window_spec = Window.partitionBy(self.col_user).orderBy(F.col(self.col_prediction).desc())
# Use rating_pred_raw which has user, item, prediction score
ranked_preds = self.rating_pred_raw.select(
self.col_user, self.col_item, self.col_prediction
).withColumn("rank", F.row_number().over(window_spec))

# Join ranked predictions with R
preds_with_r = ranked_preds.join(
ground_truth_with_R.select(self.col_user, "R"), on=self.col_user
)

# Filter for top R predictions
top_r_preds = preds_with_r.filter(F.col("rank") <= F.col("R"))

# Check which top R predictions are in the ground truth relevant items
# Create a dataframe of relevant items for easy joining
relevant_items_flagged = ground_truth_items.withColumn("is_relevant", F.lit(1))

relevant_in_top_r = top_r_preds.join(
relevant_items_flagged.select(self.col_user, self.col_item, "is_relevant"),
[self.col_user, self.col_item],
"left"
).fillna(0, subset=["is_relevant"]) # Predictions not in ground truth get is_relevant = 0

# Calculate number of relevant items found in top R for each user
user_metrics = relevant_in_top_r.groupBy(self.col_user, "R").agg(
F.sum("is_relevant").alias("num_relevant_in_top_r")
)

# Calculate R-Precision per user
user_r_precision = user_metrics.withColumn(
"r_precision_user", F.col("num_relevant_in_top_r") / F.col("R")
)

# Calculate the average R-Precision across all users
# Ensure we only average over users who were in ground_truth_with_R (R>0)
avg_r_precision = user_r_precision.agg(F.mean("r_precision_user")).first()[0]

return avg_r_precision if avg_r_precision is not None else 0.0


def _get_top_k_items(
dataframe,
Expand Down Expand Up @@ -831,7 +899,7 @@ def user_item_serendipity(self):
Y.C. Zhang, D.Ó. Séaghdha, D. Quercia and T. Jambor, Auralist:
introducing serendipity into music recommendation, WSDM 2012

Eugene Yan, Serendipity: Accuracy’s unpopular best friend in Recommender Systems,
Eugene Yan, Serendipity's unpopular best friend in Recommender Systems,
eugeneyan.com, April 2020

Returns:
Expand Down
34 changes: 34 additions & 0 deletions tests/unit/recommenders/evaluation/test_spark_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,37 @@ def test_serendipity_item_feature_vector(spark_diversity_data):
col_relevance="Relevance",
)
assert evaluator.serendipity() == pytest.approx(0.4028, TOL)


@pytest.mark.spark
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with the tests:

$ pytest tests/unit/recommenders/evaluation/test_spark_evaluation.py 
======================================================================== test session starts =========================================================================
platform linux -- Python 3.11.11, pytest-8.2.2, pluggy-1.5.0
rootdir: /home/miguel/MS/review/demoncoder-recommenders
configfile: pyproject.toml
plugins: anyio-4.4.0, cov-5.0.0, typeguard-4.3.0, hypothesis-6.104.2, mock-3.14.0
collected 30 items                                                                                                                                                   

tests/unit/recommenders/evaluation/test_spark_evaluation.py .............................F                                                                     [100%]

============================================================================== FAILURES ==============================================================================
_______________________________________________________________________ test_spark_r_precision _______________________________________________________________________

spark_data = (DataFrame[userID: bigint, itemID: bigint, rating: bigint], DataFrame[userID: bigint, itemID: bigint, prediction: bigint, rating: bigint])

    @pytest.mark.spark
    def test_spark_r_precision(spark_data):
        df_true, df_pred = spark_data
    
        # Test perfect prediction (R-Precision should be 1.0)
        evaluator_perfect = SparkRankingEvaluation(df_true, df_true, col_prediction="rating")
>       assert evaluator_perfect.r_precision() == pytest.approx(1.0, TOL)

tests/unit/recommenders/evaluation/test_spark_evaluation.py:526: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
recommenders/evaluation/spark_evaluation.py:386: in r_precision
    ranked_preds = self.rating_pred.select(
../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3227: in select
    jdf = self._jdf.select(self._jcols(*cols))
../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

a = ('xro5784', <py4j.clientserver.JavaClient object at 0x7fb0e286f110>, 'o5697', 'select'), kw = {}, converted = AnalysisException()

    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
>               raise converted from None
E               pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `itemID` cannot be resolved. Did you mean one of the following? [`userID`, `prediction`].;
E               'Project [userID#4984L, 'itemID, 'rating]
E               +- Aggregate [userID#4984L], [userID#4984L, collect_list(itemID#4985L, 0, 0) AS prediction#5009]
E                  +- Filter (rank#4999 <= 10)
E                     +- Project [userID#4984L, itemID#4985L, rating#4986L, rank#4999]
E                        +- Project [userID#4984L, itemID#4985L, rating#4986L, rank#4999, rank#4999]
E                           +- Window [row_number() windowspecdefinition(userID#4984L, rating#4986L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4999], [userID#4984L], [rating#4986L DESC NULLS LAST]
E                              +- Project [userID#4984L, itemID#4985L, rating#4986L]
E                                 +- LogicalRDD [userID#4984L, itemID#4985L, rating#4986L], false

../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185: AnalysisException
------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------
                                                                                
========================================================================== warnings summary ==========================================================================
tests/unit/recommenders/evaluation/test_spark_evaluation.py: 164 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/pandas/utils.py:37: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
    if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):

tests/unit/recommenders/evaluation/test_spark_evaluation.py: 181 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:485: DeprecationWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
    if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:

tests/unit/recommenders/evaluation/test_spark_evaluation.py: 15 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/context.py:158: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
    warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
====================================================================== short test summary info =======================================================================
FAILED tests/unit/recommenders/evaluation/test_spark_evaluation.py::test_spark_r_precision - pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `itemID` cannot be resolved. D...
======================================================= 1 failed, 29 passed, 360 warnings in 247.48s (0:04:07) =======================================================

def test_spark_r_precision(spark_data):
df_true, df_pred = spark_data

# Test perfect prediction (R-Precision should be 1.0)
evaluator_perfect = SparkRankingEvaluation(df_true, df_true, col_prediction="rating")
assert evaluator_perfect.r_precision() == pytest.approx(1.0, TOL)

# Test with sample prediction data
evaluator = SparkRankingEvaluation(df_true, df_pred)
# Expected value calculation:
# User 1: R=3 relevant items (1, 2, 3). Top 3 predictions: (1, 0.8), (5, 0.6), (2, 0.4). Relevant in top 3: (1, 2). R-Prec = 2/3
# User 2: R=2 relevant items (1, 4). Top 2 predictions: (1, 0.9), (4, 0.7). Relevant in top 2: (1, 4). R-Prec = 2/2 = 1.0
# User 3: R=1 relevant item (2). Top 1 prediction: (2, 0.7). Relevant in top 1: (2). R-Prec = 1/1 = 1.0
# Mean R-Precision = (2/3 + 1.0 + 1.0) / 3 = (0.6666... + 1 + 1) / 3 = 2.6666... / 3 = 0.8888...
expected_r_precision = (2/3 + 1.0 + 1.0) / 3
assert evaluator.r_precision() == pytest.approx(expected_r_precision, TOL)

# Test case where a user has no relevant items (ensure they are ignored)
# Add a user 4 with only predictions, no ground truth
spark = df_pred.sql_ctx.sparkSession
new_pred_row = spark.createDataFrame([(4, 1, 0.9), (4, 2, 0.8)], df_pred.columns)
df_pred_extra_user = df_pred.union(new_pred_row)
evaluator_extra = SparkRankingEvaluation(df_true, df_pred_extra_user)
# Result should be the same as before, ignoring user 4
assert evaluator_extra.r_precision() == pytest.approx(expected_r_precision, TOL)

# Test case where NO users have relevant items (R=0 for all)
empty_true = df_true.filter("userID > 10") # Create empty ground truth
with pytest.warns(UserWarning, match="No users with relevant items found"): # Check for warning
evaluator_no_relevant = SparkRankingEvaluation(empty_true, df_pred)
assert evaluator_no_relevant.r_precision() == 0.0