Skip to content

[datafusion-spark] Implement ceil&floor function for spark #15958

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

irenjj
Copy link
Contributor

@irenjj irenjj commented May 6, 2025

Which issue does this PR close?

Rationale for this change

Add Spark-style ceil&floor function, the differences between DataFusion and Spark's implementations of mathematical functions ceil and floor:

  1. Function Signatures:

    • Spark provides a more flexible implementation with ceil(x, scale) accepting two parameters
    • DataFusion only implements the single parameter version ceil(x)
  2. Type Support:

    • Spark has broader type support, handling INTEGER, DECIMAL, DOUBLE, and FLOAT types
    • DataFusion currently only supports Float32 and Float64 types

What changes are included in this PR?

Add new ceil&floor function.

Are these changes tested?

Yes.

Are there any user-facing changes?

No.

@irenjj irenjj force-pushed the spart_ceil_floor branch from 58a56c9 to fd203b9 Compare May 6, 2025 08:41
@irenjj irenjj force-pushed the spart_ceil_floor branch from fd203b9 to d13258e Compare May 6, 2025 08:41
@irenjj irenjj force-pushed the spart_ceil_floor branch from bf4dcbc to c209438 Compare May 6, 2025 08:49
Comment on lines +240 to +245
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Err(generic_internal_err(
"ceil",
"`return_type` should not be called, call `return_type_from_args` instead",
))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @shehabgamin, we encountered an issue here: most of the code was copied from sail. In sail, the return value is constructed using return_type_from_args, which requires access to both args.arg_types and args.scalar_arguments to determine the final value. However, in the latest version of DF, return_type_from_args has been removed. It seems that relying solely on datatype is no longer sufficient to construct the return value within return_type().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@irenjj Would return_field_from_args work here?

/// What type will be returned by this function, given the arguments?
///
/// By default, this function calls [`Self::return_type`] with the
/// types of each argument.
///
/// # Notes
///
/// Most UDFs should implement [`Self::return_type`] and not this
/// function as the output type for most functions only depends on the types
/// of their inputs (e.g. `sqrt(f32)` is always `f32`).
///
/// This function can be used for more advanced cases such as:
///
/// 1. specifying nullability
/// 2. return types based on the **values** of the arguments (rather than
/// their **types**.
///
/// # Output Type based on Values
///
/// For example, the following two function calls get the same argument
/// types (something and a `Utf8` string) but return different types based
/// on the value of the second argument:
///
/// * `arrow_cast(x, 'Int16')` --> `Int16`
/// * `arrow_cast(x, 'Float32')` --> `Float32`
///
/// # Requirements
///
/// This function **must** consistently return the same type for the same
/// logical input even if the input is simplified (e.g. it must return the same
/// value for `('foo' | 'bar')` as it does for ('foobar').
fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result<Field> {
let data_types = args
.arg_fields
.iter()
.map(|f| f.data_type())
.cloned()
.collect::<Vec<_>>();
let return_type = self.return_type(&data_types)?;
Ok(Field::new(self.name(), return_type, true))
}

@shehabgamin
Copy link
Contributor

shehabgamin commented May 6, 2025

@irenjj Here are some tests for ceil and floor that you can re-use. Sail adheres to Spark SQL syntax, so you may need to adjust as needed for this pr:

@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label May 7, 2025
@irenjj irenjj marked this pull request as ready for review May 7, 2025 06:04
@shehabgamin
Copy link
Contributor

@irenjj I'll review this by tomorrow!

I'm not a committer though, so we'll still need a review from someone else as well. cc @alamb @andygrove

@andygrove
Copy link
Member

Hi @irenjj. Could you explain in the PR description how these functions differ from the standard DataFusion implementation (i.e., what is Spark-specific about them)? That will help with reviews.

@andygrove
Copy link
Member

@parthchandra @huaxingao @comphead @mbutrovich @kazuyukitanimura, FYI, in case you want to review. In Comet, we are currently using DataFusion's ceil and floor functions and have not specialized for Spark.

@andygrove andygrove changed the title Implement ceil&floor function for spark [datafusion-spark] Implement ceil&floor function for spark May 7, 2025
let arg_len = args.args.len();
let target_scale = if arg_len == 1 {
Ok(&None)
} else if arg_len == 2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Floor and Ceil expressions in Spark are unary expressions and only take a single argument:

case class Ceil(child: _root_.org.apache.spark.sql.catalyst.expressions.Expression) extends _root_.org.apache.spark.sql.catalyst.expressions.UnaryMathExpression {

When calling ceil in Spark SQL with two parameters, it looks like a cast is added to the input expression. For example, ceil(_1, 2) translates to plan: *(1) Project [ceil(cast(_1#6 as decimal(30,15)), 2) AS ceil(_1, 2)#10.

Perhaps you implemented the two-argument version to work around the fact that DataFusion's logical optimizer doesn't implement the same logic as Spark for adding this cast? I wonder if it is worth looking into adding a Spark-compatible optimizer rule to add this cast in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I have confused myself now. The plan does still show two arguments, but the physical plan only has a single argument 😕

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, there is a separate RoundCeil in Spark for the two-argument case:

case class RoundCeil(child: Expression, scale: Expression)

query error
SELECT ceil(3.1411, -3);
----
DataFusion error: Error during planning: Failed to coerce arguments to satisfy a call to 'ceil' function: coercion from [Float64, Int64] to the signature Uniform(1, [Float64, Float32]) failed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that these errors need to be fixed still

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[datafusion-spark] Implement ceil function
3 participants