Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions examples/03_evaluate/evaluation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"\r\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when you submit the fixed version, could you please run the whole notebook so people can see the results?

Please use python 3.11 which is the latest one we support.

"[Stage 0:> (0 + 1) / 1]\r"
]
},
Expand All @@ -350,7 +350,7 @@
"name": "stderr",
"output_type": "stream",
"text": [
"\r",
"\r\n",
" \r"
]
}
Expand Down Expand Up @@ -773,7 +773,34 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2.2.8 ROC and AUC\n",
"#### 2.2.8 R-Precision\n",
"\n",
"R-Precision evaluates the fraction of relevant items among the top R recommended items, where R is the total number of *truly* relevant items for a specific user. It's equivalent to Recall@R.\n",
"\n",
"**Difference from Precision@k:** Precision@k measures relevance within a fixed top *k* items, regardless of the total number of relevant items (R). R-Precision adapts the evaluation depth (*R*) based on the user's specific ground truth, making it potentially more user-centric when the number of relevant items varies significantly across users.\n",
Comment on lines +776 to +780
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get an error when trying to compute the notebook:

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[19], [line 6](vscode-notebook-cell:?execution_count=19&line=6)
      [1](vscode-notebook-cell:?execution_count=19&line=1) # Note: The spark_rank_eval object was initialized with k=3. 
      [2](vscode-notebook-cell:?execution_count=19&line=2) # R-Precision intrinsically uses R (number of relevant items for the user) as the cutoff.
      [3](vscode-notebook-cell:?execution_count=19&line=3) # The 'k' parameter passed during initialization doesn't directly affect R-Precision calculation itself,
      [4](vscode-notebook-cell:?execution_count=19&line=4) # but it might affect how the rating_pred dataframe is pre-processed if relevancy_method relies on k.
      [5](vscode-notebook-cell:?execution_count=19&line=5) # For a direct comparison with other metrics at a fixed k, ensure the underlying data processing is consistent.
----> [6](vscode-notebook-cell:?execution_count=19&line=6) print(f"The R-Precision is {spark_rank_eval.r_precision()}")

File ~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:386, in SparkRankingEvaluation.r_precision(self)
    [384](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:384) # Rank predictions per user
    [385](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:385) window_spec = Window.partitionBy(self.col_user).orderBy(F.col(self.col_prediction).desc())
--> [386](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:386) ranked_preds = self.rating_pred.select(
    [387](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:387)     self.col_user, self.col_item, self.col_prediction
    [388](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:388) ).withColumn("rank", F.row_number().over(window_spec))
    [390](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:390) # Join ranked predictions with R
    [391](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:391) preds_with_r = ranked_preds.join(
    [392](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:392)     ground_truth_with_R.select(self.col_user, "R"), on=self.col_user
    [393](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/MS/review/demoncoder-recommenders/recommenders/evaluation/spark_evaluation.py:393) )

File ~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3227, in DataFrame.select(self, *cols)
   [3182](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3182) def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   [3183](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3183)     """Projects a set of expressions and returns a new :class:`DataFrame`.
   [3184](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3184) 
   [3185](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3185)     .. versionadded:: 1.3.0
   (...)
   [3225](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3225)     +-----+---+
   [3226](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3226)     """
-> [3227](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3227)     jdf = self._jdf.select(self._jcols(*cols))
   [3228](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3228)     return DataFrame(jdf, self.sparkSession)

File ~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   [1316](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1316) command = proto.CALL_COMMAND_NAME +\
   [1317](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1317)     self.command_header +\
   [1318](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1318)     args_command +\
   [1319](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1319)     proto.END_COMMAND_PART
   [1321](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1321) answer = self.gateway_client.send_command(command)
-> [1322](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1322) return_value = get_return_value(
   [1323](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1323)     answer, self.gateway_client, self.target_id, self.name)
   [1325](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1325) for temp_arg in temp_args:
   [1326](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1326)     if hasattr(temp_arg, "_detach"):

File ~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185, in capture_sql_exception.<locals>.deco(*a, **kw)
    [181](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:181) converted = convert_exception(e.java_exception)
    [182](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:182) if not isinstance(converted, UnknownException):
    [183](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:183)     # Hide where the exception came from that shows a non-Pythonic
    [184](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:184)     # JVM exception message.
--> [185](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185)     raise converted from None
    [186](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:186) else:
    [187](https://vscode-remote+wsl-002bubuntu-002d20-002e04.vscode-resource.vscode-cdn.net/home/miguel/MS/review/demoncoder-recommenders/examples/03_evaluate/~/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:187)     raise

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `MovieId` cannot be resolved. Did you mean one of the following? [`UserId`, `prediction`].;
'Project [UserId#6L, 'MovieId, 'Rating]
+- Aggregate [UserId#6L], [UserId#6L, collect_list(MovieId#7L, 0, 0) AS prediction#237]
   +- Filter (rank#227 <= 3)
      +- Project [UserId#6L, MovieId#7L, Rating#8L, rank#227]
         +- Project [UserId#6L, MovieId#7L, Rating#8L, rank#227, rank#227]
            +- Window [row_number() windowspecdefinition(UserId#6L, Rating#8L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#227], [UserId#6L], [Rating#8L DESC NULLS LAST]
               +- Project [UserId#6L, MovieId#7L, Rating#8L]
                  +- LogicalRDD [UserId#6L, MovieId#7L, Rating#8L], false

"\n",
"**Difference from Recall@k:** Recall@k measures how many of the *total* relevant items (R) are found within the top *k* recommendations. R-Precision focuses specifically on the precision within the top *R* items."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Note: The spark_rank_eval object was initialized with k=3. \n",
"# R-Precision intrinsically uses R (number of relevant items for the user) as the cutoff.\n",
"# The 'k' parameter passed during initialization doesn't directly affect R-Precision calculation itself,\n",
"# but it might affect how the rating_pred dataframe is pre-processed if relevancy_method relies on k.\n",
"# For a direct comparison with other metrics at a fixed k, ensure the underlying data processing is consistent.\n",
"print(f\"The R-Precision is {spark_rank_eval.r_precision()}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### 2.2.9 ROC and AUC\n",
"\n",
"ROC, as well as AUC, is a well known metric that is used for evaluating binary classification problem. It is similar in the case of binary rating typed recommendation algorithm where the \"hit\" accuracy on the relevant items is used for measuring the recommender's performance. \n",
"\n",
Expand Down Expand Up @@ -1972,4 +1999,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
}
}
67 changes: 66 additions & 1 deletion recommenders/evaluation/spark_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the MIT License.

import numpy as np
import warnings # Added for R-Precision warning

try:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
Expand Down Expand Up @@ -356,6 +357,70 @@ def map_at_k(self):
"""
return self._metrics.meanAveragePrecisionAt(self.k)

def r_precision(self):
"""Calculate R-Precision.

R-Precision is the fraction of the top R recommended items that are relevant,
where R is the total number of relevant items for the user.

Returns:
float: Mean R-Precision across all users.
"""
# Assume rating_true contains only relevant items (e.g., positive interactions)
ground_truth_items = self.rating_true.select(self.col_user, self.col_item)

# Calculate R: number of relevant items per user
ground_truth_with_R = ground_truth_items.groupBy(self.col_user).agg(
F.collect_list(self.col_item).alias("ground_truth"),
F.count(self.col_item).alias("R")
)

# Filter out users with no relevant items (R=0)
ground_truth_with_R = ground_truth_with_R.filter(F.col("R") > 0)
if ground_truth_with_R.count() == 0:
warnings.warn("No users with relevant items found (R > 0). R-Precision is 0.")
return 0.0

# Rank predictions per user
window_spec = Window.partitionBy(self.col_user).orderBy(F.col(self.col_prediction).desc())
ranked_preds = self.rating_pred.select(
self.col_user, self.col_item, self.col_prediction
).withColumn("rank", F.row_number().over(window_spec))

# Join ranked predictions with R
preds_with_r = ranked_preds.join(
ground_truth_with_R.select(self.col_user, "R"), on=self.col_user
)

# Filter for top R predictions
top_r_preds = preds_with_r.filter(F.col("rank") <= F.col("R"))

# Check which top R predictions are in the ground truth relevant items
# Create a dataframe of relevant items for easy joining
relevant_items_flagged = ground_truth_items.withColumn("is_relevant", F.lit(1))

relevant_in_top_r = top_r_preds.join(
relevant_items_flagged.select(self.col_user, self.col_item, "is_relevant"),
[self.col_user, self.col_item],
"left"
).fillna(0, subset=["is_relevant"]) # Predictions not in ground truth get is_relevant = 0

# Calculate number of relevant items found in top R for each user
user_metrics = relevant_in_top_r.groupBy(self.col_user, "R").agg(
F.sum("is_relevant").alias("num_relevant_in_top_r")
)

# Calculate R-Precision per user
user_r_precision = user_metrics.withColumn(
"r_precision_user", F.col("num_relevant_in_top_r") / F.col("R")
)

# Calculate the average R-Precision across all users
# Ensure we only average over users who were in ground_truth_with_R (R>0)
avg_r_precision = user_r_precision.agg(F.mean("r_precision_user")).first()[0]

return avg_r_precision if avg_r_precision is not None else 0.0


def _get_top_k_items(
dataframe,
Expand Down Expand Up @@ -831,7 +896,7 @@ def user_item_serendipity(self):
Y.C. Zhang, D.Ó. Séaghdha, D. Quercia and T. Jambor, Auralist:
introducing serendipity into music recommendation, WSDM 2012

Eugene Yan, Serendipity: Accuracy’s unpopular best friend in Recommender Systems,
Eugene Yan, Serendipity's unpopular best friend in Recommender Systems,
eugeneyan.com, April 2020

Returns:
Expand Down
55 changes: 55 additions & 0 deletions tests/unit/recommenders/evaluation/test_spark_evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,58 @@ def test_serendipity_item_feature_vector(spark_diversity_data):
col_relevance="Relevance",
)
assert evaluator.serendipity() == pytest.approx(0.4028, TOL)


@pytest.mark.spark
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same with the tests:

$ pytest tests/unit/recommenders/evaluation/test_spark_evaluation.py 
======================================================================== test session starts =========================================================================
platform linux -- Python 3.11.11, pytest-8.2.2, pluggy-1.5.0
rootdir: /home/miguel/MS/review/demoncoder-recommenders
configfile: pyproject.toml
plugins: anyio-4.4.0, cov-5.0.0, typeguard-4.3.0, hypothesis-6.104.2, mock-3.14.0
collected 30 items                                                                                                                                                   

tests/unit/recommenders/evaluation/test_spark_evaluation.py .............................F                                                                     [100%]

============================================================================== FAILURES ==============================================================================
_______________________________________________________________________ test_spark_r_precision _______________________________________________________________________

spark_data = (DataFrame[userID: bigint, itemID: bigint, rating: bigint], DataFrame[userID: bigint, itemID: bigint, prediction: bigint, rating: bigint])

    @pytest.mark.spark
    def test_spark_r_precision(spark_data):
        df_true, df_pred = spark_data
    
        # Test perfect prediction (R-Precision should be 1.0)
        evaluator_perfect = SparkRankingEvaluation(df_true, df_true, col_prediction="rating")
>       assert evaluator_perfect.r_precision() == pytest.approx(1.0, TOL)

tests/unit/recommenders/evaluation/test_spark_evaluation.py:526: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
recommenders/evaluation/spark_evaluation.py:386: in r_precision
    ranked_preds = self.rating_pred.select(
../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/dataframe.py:3227: in select
    jdf = self._jdf.select(self._jcols(*cols))
../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

a = ('xro5784', <py4j.clientserver.JavaClient object at 0x7fb0e286f110>, 'o5697', 'select'), kw = {}, converted = AnalysisException()

    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
>               raise converted from None
E               pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `itemID` cannot be resolved. Did you mean one of the following? [`userID`, `prediction`].;
E               'Project [userID#4984L, 'itemID, 'rating]
E               +- Aggregate [userID#4984L], [userID#4984L, collect_list(itemID#4985L, 0, 0) AS prediction#5009]
E                  +- Filter (rank#4999 <= 10)
E                     +- Project [userID#4984L, itemID#4985L, rating#4986L, rank#4999]
E                        +- Project [userID#4984L, itemID#4985L, rating#4986L, rank#4999, rank#4999]
E                           +- Window [row_number() windowspecdefinition(userID#4984L, rating#4986L DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#4999], [userID#4984L], [rating#4986L DESC NULLS LAST]
E                              +- Project [userID#4984L, itemID#4985L, rating#4986L]
E                                 +- LogicalRDD [userID#4984L, itemID#4985L, rating#4986L], false

../../../anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py:185: AnalysisException
------------------------------------------------------------------------ Captured stderr call ------------------------------------------------------------------------
                                                                                
========================================================================== warnings summary ==========================================================================
tests/unit/recommenders/evaluation/test_spark_evaluation.py: 164 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/pandas/utils.py:37: DeprecationWarning: distutils Version classes are deprecated. Use packaging.version instead.
    if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):

tests/unit/recommenders/evaluation/test_spark_evaluation.py: 181 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/pandas/conversion.py:485: DeprecationWarning: is_datetime64tz_dtype is deprecated and will be removed in a future version. Check `isinstance(dtype, pd.DatetimeTZDtype)` instead.
    if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:

tests/unit/recommenders/evaluation/test_spark_evaluation.py: 15 warnings
  /home/miguel/anaconda/envs/recommenders311/lib/python3.11/site-packages/pyspark/sql/context.py:158: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
    warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
====================================================================== short test summary info =======================================================================
FAILED tests/unit/recommenders/evaluation/test_spark_evaluation.py::test_spark_r_precision - pyspark.errors.exceptions.captured.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `itemID` cannot be resolved. D...
======================================================= 1 failed, 29 passed, 360 warnings in 247.48s (0:04:07) =======================================================

def test_spark_r_precision(spark_data):
df_true, df_pred = spark_data

# Test perfect prediction (R-Precision should be 1.0)
evaluator_perfect = SparkRankingEvaluation(df_true, df_true, col_prediction="rating")
assert evaluator_perfect.r_precision() == pytest.approx(1.0, TOL)

# Test with sample prediction data
evaluator = SparkRankingEvaluation(df_true, df_pred)
# Expected value calculation:
# User 1: R=3 relevant items (1, 2, 3). Top 3 predictions: (1, 0.8), (5, 0.6), (2, 0.4). Relevant in top 3: (1, 2). R-Prec = 2/3
# User 2: R=2 relevant items (1, 4). Top 2 predictions: (1, 0.9), (4, 0.7). Relevant in top 2: (1, 4). R-Prec = 2/2 = 1.0
# User 3: R=1 relevant item (2). Top 1 prediction: (2, 0.7). Relevant in top 1: (2). R-Prec = 1/1 = 1.0
# Mean R-Precision = (2/3 + 1.0 + 1.0) / 3 = (0.6666... + 1 + 1) / 3 = 2.6666... / 3 = 0.8888...
expected_r_precision = (2/3 + 1.0 + 1.0) / 3
assert evaluator.r_precision() == pytest.approx(expected_r_precision, TOL)

# Test with different k values
# Using k=3
evaluator_k3 = SparkRankingEvaluation(df_true, df_pred, k=3)
# When k=3, we're still getting the same top R predictions for each user since all users have R ≤ 3
assert evaluator_k3.r_precision() == pytest.approx(expected_r_precision, TOL)

# Using k=5
evaluator_k5 = SparkRankingEvaluation(df_true, df_pred, k=5)
# When k=5, we're still getting the same top R predictions for each user
assert evaluator_k5.r_precision() == pytest.approx(expected_r_precision, TOL)

# Test that r_precision is equivalent to precision when R is the same for all users
# and equal to k (comparing to precision_at_k test)
# We limit the data to users with the same number of relevant items
# and set k to that number
same_r_df_true = df_true.filter("userID = 1") # User 1 has R=3
k_value = 3 # Same as R for user 1
r_precision_evaluator = SparkRankingEvaluation(same_r_df_true, df_pred, k=k_value)
precision_evaluator = SparkRankingEvaluation(same_r_df_true, df_pred, k=k_value)
assert r_precision_evaluator.r_precision() == pytest.approx(precision_evaluator.precision_at_k(), TOL)

# Test case where a user has no relevant items (ensure they are ignored)
# Add a user 4 with only predictions, no ground truth
spark = df_pred.sql_ctx.sparkSession
new_pred_row = spark.createDataFrame([(4, 1, 0.9), (4, 2, 0.8)], df_pred.columns)
df_pred_extra_user = df_pred.union(new_pred_row)
evaluator_extra = SparkRankingEvaluation(df_true, df_pred_extra_user)
# Result should be the same as before, ignoring user 4
assert evaluator_extra.r_precision() == pytest.approx(expected_r_precision, TOL)

# Test case where NO users have relevant items (R=0 for all)
empty_true = df_true.filter("userID > 10") # Create empty ground truth
with pytest.warns(UserWarning, match="No users with relevant items found"): # Check for warning
evaluator_no_relevant = SparkRankingEvaluation(empty_true, df_pred)
assert evaluator_no_relevant.r_precision() == 0.0