Skip to content

Commit e3dc79d

Browse files
committed
chore(cubestore): Upgrade DF: Rewrite InList expression type conversion when list is literals
1 parent b94db00 commit e3dc79d

File tree

2 files changed

+88
-0
lines changed

2 files changed

+88
-0
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod serialized_plan;
1313
mod tail_limit;
1414
mod topk;
1515
pub mod trace_data_loaded;
16+
use rewrite_inlist_literals::RewriteInListLiterals;
1617
use serialized_plan::PreSerializedPlan;
1718
pub use topk::MIN_TOPK_STREAM_ROWS;
1819
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
@@ -22,6 +23,7 @@ pub mod info_schema;
2223
pub mod merge_sort;
2324
pub mod metadata_cache;
2425
pub mod providers;
26+
mod rewrite_inlist_literals;
2527
#[cfg(test)]
2628
mod test_utils;
2729
pub mod udfs;
@@ -247,6 +249,7 @@ impl QueryPlannerImpl {
247249
for udf in registerable_scalar_udfs() {
248250
context.register_udf(udf);
249251
}
252+
context.add_analyzer_rule(Arc::new(RewriteInListLiterals {}));
250253

251254
// TODO upgrade DF
252255
// context
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
use datafusion::arrow::datatypes::DataType;
2+
use datafusion::common::tree_node::Transformed;
3+
use datafusion::common::DFSchema;
4+
use datafusion::config::ConfigOptions;
5+
use datafusion::error::DataFusionError;
6+
use datafusion::logical_expr::expr::InList;
7+
use datafusion::logical_expr::utils::merge_schema;
8+
use datafusion::logical_expr::{Cast, ExprSchemable, LogicalPlan};
9+
use datafusion::optimizer::AnalyzerRule;
10+
use datafusion::prelude::Expr;
11+
use datafusion::scalar::ScalarValue;
12+
use itertools::Itertools;
13+
use std::fmt::Debug;
14+
15+
#[derive(Debug)]
16+
pub struct RewriteInListLiterals;
17+
18+
impl AnalyzerRule for RewriteInListLiterals {
19+
fn analyze(
20+
&self,
21+
plan: LogicalPlan,
22+
_config: &ConfigOptions,
23+
) -> Result<LogicalPlan, DataFusionError> {
24+
plan.transform_with_subqueries(|plan| {
25+
let schema: DFSchema = if let LogicalPlan::TableScan(ts) = &plan {
26+
let source_schema = DFSchema::try_from_qualified_schema(
27+
ts.table_name.clone(),
28+
&ts.source.schema(),
29+
)?;
30+
source_schema
31+
} else {
32+
merge_schema(&plan.inputs())
33+
};
34+
35+
plan.map_expressions(|expr| {
36+
// TODO upgrade DF: We clone inner and castee -- for performance, avoid that.
37+
38+
// TODO upgrade DF: The problem is, this assumes that the Cast we see was added by
39+
// type conversion -- what if the query actually has CAST(1 AS Utf8) IN ('1', '2')?
40+
// Can we put this rewrite ahead of type conversion?
41+
match &expr {
42+
Expr::InList(InList {
43+
expr: inner,
44+
list,
45+
negated,
46+
}) => match inner.as_ref() {
47+
Expr::Cast(Cast {
48+
expr: castee,
49+
data_type,
50+
}) => {
51+
if data_type == &DataType::Utf8 {
52+
if list.iter().all(|item| {
53+
matches!(item, Expr::Literal(ScalarValue::Utf8(Some(_))))
54+
}) {
55+
let castee_type: DataType = castee.get_type(&schema)?;
56+
return Ok(Transformed::yes(Expr::InList(InList {
57+
expr: castee.clone(),
58+
list: list
59+
.iter()
60+
.map(|ex| {
61+
Expr::Cast(Cast {
62+
expr: Box::new(ex.clone()),
63+
data_type: castee_type.clone(),
64+
})
65+
})
66+
.collect_vec(),
67+
negated: *negated,
68+
})));
69+
}
70+
}
71+
}
72+
_ => {}
73+
},
74+
_ => {}
75+
};
76+
return Ok(Transformed::no(expr));
77+
})
78+
})
79+
.map(|t| t.data)
80+
}
81+
82+
fn name(&self) -> &str {
83+
"rewrite_inlist_literals"
84+
}
85+
}

0 commit comments

Comments
 (0)