Skip to content

Filtering joins with filters on both x,y tables causing error on Spark 3.4 #1606

@blckassassin

Description

@blckassassin

Problem: Filtering joins (semi/anti) where tables are being filtered in the join are causing an error. This does not happen in mutating joins, or locally, and would not expect an error to happen in a filtering join or on Spark.

Reprex added below showing that the same logic can be done locally without an error, but once tables are on spark an error occurs. Error also occurs with semi-joins, but does not occur with filtering joins such as a left-join. Error also appears to be occuring when an aggregation is occurring on both x and y table at the same time.

Brief description of the problem

library(rmarkdown)
library(reprex)
library(sparklyr)
#> Warning: package 'sparklyr' was built under R version 4.4.3
#> 
#> Attaching package: 'sparklyr'
#> The following object is masked from 'package:stats':
#> 
#>     filter
library(dplyr)
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union

sc <- spark_connect(master = "local", version = "3.4")

df1 <- data.frame(ID = c(1, 2, 3, 4, 5, 6, 7, 8, 9))
df2 <- data.frame(ID = c(1, 3, 4, 9))

print(df1 %>% filter(ID == max(ID)) %>%
  anti_join(df2 %>% filter(ID != max(ID)), by = c("ID")))
#>   ID
#> 1  9

sparklyr::copy_to(sc, df1)
#> # Source:   table<`df1`> [?? x 1]
#> # Database: spark_connection
#>      ID
#>   <dbl>
#> 1     1
#> 2     2
#> 3     3
#> 4     4
#> 5     5
#> 6     6
#> 7     7
#> 8     8
#> 9     9
sparklyr::copy_to(sc, df2)
#> # Source:   table<`df2`> [?? x 1]
#> # Database: spark_connection
#>      ID
#>   <dbl>
#> 1     1
#> 2     3
#> 3     4
#> 4     9

spark_df1 <- spark_read_table(sc, "df1")
spark_df2 <- spark_read_table(sc, "df2")

print(spark_df1 %>% filter(ID == max(ID)) %>% 
  anti_join(spark_df2 %>% filter(ID != max(ID)), by = c("ID")))
#> Warning: Missing values are always removed in SQL aggregation functions.
#> Use `na.rm = TRUE` to silence this warning
#> This warning is displayed once every 8 hours.
#> Error in `get_env()`:
#> ! Can't extract an environment from a call.

spark_disconnect_all()
#> [1] 1
Created on 2025-04-09 with [reprex v2.1.1](https://reprex.tidyverse.org/)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions