-
Notifications
You must be signed in to change notification settings - Fork 1.5k
feat: implement predicate adaptation missing fields of structs #16589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Marking as draft until I have time do a review myself. @kosiew feel free to take a look if you have time but I expect this needs work before it's ready. |
@@ -135,6 +136,7 @@ datafusion-functions-window-common = { path = "datafusion/functions-window-commo | |||
datafusion-macros = { path = "datafusion/macros", version = "48.0.0" } | |||
datafusion-optimizer = { path = "datafusion/optimizer", version = "48.0.0", default-features = false } | |||
datafusion-physical-expr = { path = "datafusion/physical-expr", version = "48.0.0", default-features = false } | |||
datafusion-physical-expr-adapter = { path = "datafusion/physical-expr-adapter", version = "48.0.0", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to make a new package because physical-expr
does not have access to functions-nested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}; | ||
use datafusion_physical_expr_common::physical_expr::PhysicalExpr; | ||
|
||
/// Build a struct expression by recursively extracting and rewriting fields from a source struct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an example of what this is doing?
It seems like if the table schema is like {a: int, b:int}
and the file only has {a: int}
this function will build up an expression like
struct(a, source.a, b, null)
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..)
manually
Is it the case that build_struct_expr
would be unecessary if cast
had native support for casting structs to struct
s and filling in nulls 🤔
How about this for an alternate idea:
- Add a
cast
wrapper in datafusion with the same signature as arrow-rs's cast - If the arguments were both Struct types, apply the special DataFusion casting logic(fill in missing fields with nulls)
- otherwise just call into the existing arrow cast kernel
That would
- Avoid the need to call the struct and get_field functions
- Allow other parts of the system, like coercsion, and SQL, to have the same semantics
We have talked about adding cast for evolving structs in arrow-rs for a while but it seems the consensus was to put it in DataFusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..) manually
Does it have to match? These seem like two similar but not exactly identical use cases to me.
How about this for an alternate idea:
That seems reasonable to me.
Avoid the need to call the struct and get_field functions
I am probably missing something but thinking about it to me it didn't seem like those calls would be much of a problem. Either way a new struct column is going to have to be built and the existing struct column is going to have to be read from disk in its entirety.
But the pushback made me think that maybe we should be doing something else here: we should be inspecting the expression more deeply and in the case of col.field
if field
is not present in the physical schema we replace the whole get_field(col, 'field')
expression with null
instead of get_col(struct('other', get_field(col, 'other'), 'field', null)))
which is basically what is happening now. Then we avoid reading the column altogether. We can do something similar with casts. I think this is complimentary to your proposal because the fallthrough case for select struct_col
can call cast
if needed like any other column, and our internal cast
implementation can do the "smart things" with structs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something about relying on struct and field extraction feels very wrong to me. Among other things now this casting may not be consistent with what happens when someone tries to call CAST(..) manually
Does it have to match? These seem like two similar but not exactly identical use cases to me.
I worry that if they don't match we'll have potentially divergent behavior which could be confusing to users.
Also another use case is when doing stuff like coercing structs (so for example you could compare two struct columns for equality or UNION
them together)
I am probably missing something but thinking about it to me it didn't seem like those calls would be much of a problem. Either way a new struct column is going to have to be built and the existing struct column is going to have to be read from disk in its entirety.
Yeah, I don't think it will be that much faster (maybe it would save some virtual function calls or something). What I am really concerned about is having consistent cast behavior
But the pushback made me think that maybe we should be doing something else here: we should be inspecting the expression more deeply and in the case of
col.field
iffield
is not present in the physical schema we replace the wholeget_field(col, 'field')
expression withnull
instead ofget_col(struct('other', get_field(col, 'other'), 'field', null)))
which is basically what is happening now. Then we avoid reading the column altogether. We can do something similar with casts. I think this is complimentary to your proposal because the fallthrough case forselect struct_col
can callcast
if needed like any other column, and our internalcast
implementation can do the "smart things" with structs.
👍
@alamb I've reworked this as per discussion in #16589 (comment). This now leaves the actual casting work up the cast functions, which means that we can do the work there and it will just trickle through to here. I do still recommend moving forward with this PR because if we ever wanted to reference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some observations that stood out to me.
Continuing to review...
if let Some(column) = expr.as_any().downcast_ref::<Column>() { | ||
return self.rewrite_column(Arc::clone(&expr), column); | ||
} | ||
|
||
Ok(Transformed::no(expr)) | ||
} | ||
|
||
fn try_rewrite_struct_field_access( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new try_rewrite_struct_field_access handles missing struct fields by returning Null. I tried adding a test case for nested structs (e.g., a.b.c) to ensure recursive behavior but it failed.
#[test]
fn test_rewrite_nested_struct_missing_field() {
let physical_schema = Schema::new(vec![Field::new(
"nested",
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(vec![Field::new("b", DataType::Utf8, true)].into()),
true,
)]
.into(),
),
true,
)]);
let logical_schema = Schema::new(vec![Field::new(
"nested",
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Int32, true),
]
.into(),
),
true,
)]
.into(),
),
true,
)]);
let rewriter = PhysicalExprSchemaRewriter::new(&physical_schema, &logical_schema);
let column_expr = Arc::new(Column::new("nested", 0));
let result = rewriter.rewrite(column_expr).unwrap();
let expected = Arc::new(CastExpr::new(
Arc::new(Column::new("nested", 0)),
DataType::Struct(
vec![Field::new(
"a",
DataType::Struct(
vec![
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Int32, true),
]
.into(),
),
true,
)]
.into(),
),
None,
)) as Arc<dyn PhysicalExpr>;
assert_eq!(result.to_string(), expected.to_string());
}
called `Result::unwrap()` on an `Err` value: Execution("Cannot cast column 'nested' from 'Struct(a Struct(b Utf8))' (physical data type) to 'Struct(a Struct(b Utf8, c Int32))' (logical data type)")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's expected: the point is that we need to implement struct casting as part of the cast operator in general, not as part of this PR. That's the point @alamb made in #16589 (comment). Is there any reason why we haven't done that since you've basically implemented it for SchemaAdapter
? I'd think it's the same code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically: I think we need to incorporate your logic from #1637 into CastExpr. Somewhat related to apache/arrow-rs#6735
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaa..... 🤔
The PR Title - implement predicate adaptation for nested structs
and
Functionality equivalent to https://github.com/apache/datafusion/pull/16371 but for https://github.com/apache/datafusion/pull/16461.
gave me the expectation that this PR implements nested struct adaption already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. It used to. I've updated the title and PR description
Co-authored-by: kosiew <[email protected]>
} | ||
|
||
#[test] | ||
fn test_rewrite_mulit_column_expr_with_type_cast() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for replacing this multi column test with the single column test_rewrite_column_with_type_cast?
} | ||
|
||
#[test] | ||
fn test_rewrite_no_change_needed() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for removing this test?
Adds predicate adaptation to fill in missing struct fields with null, i.e.
struct_col.field_missing_in_file
->null
.