Skip to content

feat(datafusion): Treat timestamp conversion functions like a cast. #945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 102 additions & 2 deletions crates/integrations/datafusion/src/physical_plan/expr_to_predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

use std::vec;

use datafusion::logical_expr::{Expr, Operator};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::functions::datetime::to_date::ToDateFunc;
use datafusion::functions::datetime::to_timestamp::ToTimestampFunc;
use datafusion::logical_expr::{Cast, Expr, Operator};
use datafusion::scalar::ScalarValue;
use iceberg::expr::{BinaryExpression, Predicate, PredicateOperator, Reference, UnaryExpression};
use iceberg::spec::Datum;
Expand Down Expand Up @@ -119,7 +122,53 @@ fn to_iceberg_predicate(expr: &Expr) -> TransformedResult {
_ => TransformedResult::NotTransformed,
}
}
Expr::Cast(c) => to_iceberg_predicate(&c.expr),
Expr::Cast(c) => {
if DataType::Date32 == c.data_type || DataType::Date64 == c.data_type {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concerns handling this case in such a way here. This is a process of simplifying expression in datafusion, which should call datafusion's expression simplification api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less about simplification, rather than extracting the right information for Iceberg to optimize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is less about simplification, rather than extracting the right information for Iceberg to optimize.

Maybe the word simplification is a little confusing, I think constant folding is better here. The expressions in tests should be simplified by constant folding and replaced with a literal, which could be processed by iceberg.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Constant folding is being done I think, (e.g. TO_TIMESTAMP(2+2) => TO_TIMESTAMP(4)) but I am not sure it makes sense to turn TO_TIMESTAMP(<date_str>) to CAST(date_str AS TIMESTAMP) as a simplification in Datafusion. Currently the implementation of CAST and TO_TIMESTAMP is the same with no parameters, but I don't think turning TO_TIMESTAMP to CAST is a "simplification" necessarily. It may even be counter intuitive and effect errors in some weird ways, with no gains to readability or performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By simplification I don't mean converting TO_TIMESAMPT(<date_str>) to CAST(date_str AS TIMESTAMP), I mean it should be converting TO_TIMESAMPT(<date_str>) to <timestamp literal>, which should be done in constant folding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I don't mind it being in in DataFusion, I'll try opening an issue/PR there and see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem a datafusion problem, maybe we should call the simplification api in the iceberg planner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be redundant I think. We should call it in the tests to mimic production behaviour.

The current simplification API in DataFusion simplifies casts, not these specific function calls (TO_TIMESTAMP, TO_DATE) as far as I could tell.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the details, but I guess in iceberg-datafusion planner we missed sth. It's fine for me to seek for help in datafusion community.

match c.expr.as_ref() {
Expr::Literal(ScalarValue::Utf8(Some(literal))) => {
let date = literal.split('T').next();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be reluctant to do any string manipulation here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead, I would add support for the datetime format in date_from_str:

pub fn date_from_str<S: AsRef<str>>(s: S) -> Result<Self> {
let t = s.as_ref().parse::<NaiveDate>().map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Can't parse date from string: {}", s.as_ref()),
)
.with_source(e)
})?;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fokko currently date_from_str supports datetime (i.e. 2025-01-01T11:11:11 will not be truncated).

I think this is a mistake given the name of the function (date_from_str and not datetime_from_str) and its documentation, but I don't think I can change its behaviour so drastically now right?

Do you suggest I change it, or create another one that is fix?

if let Some(date) = date {
return TransformedResult::Literal(Datum::string(date));
}
}
_ => return TransformedResult::NotTransformed,
}
}
to_iceberg_predicate(&c.expr)
}
Expr::ScalarFunction(func) => {
if func
.func
.inner()
.as_any()
.downcast_ref::<ToTimestampFunc>()
.is_some()
// More than 1 argument means it's a custom format - not
// supported for now
&& func.args.len() == 1
{
return to_iceberg_predicate(&Expr::Cast(Cast::new(
Box::new(func.args[0].clone()),
DataType::Timestamp(TimeUnit::Nanosecond, None),
)));
}
if func
.func
.inner()
.as_any()
.downcast_ref::<ToDateFunc>()
.is_some()
// More than 1 argument means it's a custom format - not
// supported for now
&& func.args.len() == 1
{
return to_iceberg_predicate(&Expr::Cast(Cast::new(
Box::new(func.args[0].clone()),
DataType::Date32,
)));
}
TransformedResult::NotTransformed
}
_ => TransformedResult::NotTransformed,
}
}
Expand Down Expand Up @@ -403,4 +452,55 @@ mod tests {
Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05T00:00:00"));
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_to_timestamp_comparison_creates_predicate() {
let sql = "ts >= timestamp '2023-01-05T00:00:00'";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate =
Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05T00:00:00"));
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_to_timestamp_comparison_to_cast_creates_predicate() {
let sql = "ts >= CAST('2023-01-05T00:00:00' AS TIMESTAMP)";
let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate =
Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05T00:00:00"));
assert_eq!(predicate, expected_predicate);
}

#[test]
fn test_to_timestamp_with_custom_format_does_not_create_predicate() {
let sql =
"TO_TIMESTAMP(ts, 'YYYY-DD-MMTmm:HH:SS') >= CAST('2023-01-05T00:00:00' AS TIMESTAMP)";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}

#[test]
fn test_to_date_comparison_creates_predicate() {
let sql = "ts >= CAST('2023-01-05T11:11:11' AS DATE)";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised to see that this works:

spark-sql (default)> SELECT CAST('2023-01-05T11:11:11' AS DATE);
2023-01-05

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would have preferred an error, but nobody is asking me 😆

let predicate = convert_to_iceberg_predicate(sql).unwrap();
let expected_predicate =
Reference::new("ts").greater_than_or_equal_to(Datum::string("2023-01-05"));
assert_eq!(predicate, expected_predicate);
}

#[test]
/// When casting to DATE, usually the value is converted to datetime or timestamp,
/// and then it is truncated. DayTransform is not yet supported fully here.
/// It is specifically implemented for Strings because it is the most common use case.
/// When actual support is implemented, this test will fail and should be removed.
/// For now it is here in order to make sure that the value from within the cast
/// is not used as-is when casting to date, because it can create false predicates.
///
/// (Consider for example `ts > CAST('2023-01-05T11:11:11' AS DATE)` which should
/// create a different predicate than `ts > CAST('2023-01-05T11:11:11' AS TIMESTAMP)`)
fn test_to_date_from_non_string_is_ignored() {
let sql = "ts >= CAST(123456789 AS DATE)";
let predicate = convert_to_iceberg_predicate(sql);
assert_eq!(predicate, None);
}
}
Loading