Skip to content

Commit b9caf1d

Browse files
committed
Wrap immutable plan parts into Arc
- Closes #19852 Improve performance of query planning and plan state re-set by making node clone cheap. - Store projection as `Option<Arc<[usize]>>` instead of `Option<Vec<usize>>` in `FilterExec`, `HashJoinExec`, `NestedLoopJoinExec`. - Store exprs as `Arc<[ProjectionExpr]>` instead of Vec in `ProjectionExprs`. - Store arced aggregation, filter, group by expressions within `AggregateExec`.
1 parent d90d074 commit b9caf1d

File tree

18 files changed

+259
-138
lines changed

18 files changed

+259
-138
lines changed

datafusion/common/src/stats.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ impl Statistics {
391391
/// For example, if we had statistics for columns `{"a", "b", "c"}`,
392392
/// projecting to `vec![2, 1]` would return statistics for columns `{"c",
393393
/// "b"}`.
394-
pub fn project(mut self, projection: Option<&Vec<usize>>) -> Self {
394+
pub fn project(mut self, projection: Option<&[usize]>) -> Self {
395395
let Some(projection) = projection else {
396396
return self;
397397
};
@@ -410,7 +410,7 @@ impl Statistics {
410410
.map(Slot::Present)
411411
.collect();
412412

413-
for idx in projection {
413+
for idx in projection.iter() {
414414
let next_idx = self.column_statistics.len();
415415
let slot = std::mem::replace(
416416
columns.get_mut(*idx).expect("projection out of bounds"),
@@ -1067,28 +1067,28 @@ mod tests {
10671067
#[test]
10681068
fn test_project_none() {
10691069
let projection = None;
1070-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1070+
let stats = make_stats(vec![10, 20, 30]).project(projection);
10711071
assert_eq!(stats, make_stats(vec![10, 20, 30]));
10721072
}
10731073

10741074
#[test]
10751075
fn test_project_empty() {
10761076
let projection = Some(vec![]);
1077-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1077+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10781078
assert_eq!(stats, make_stats(vec![]));
10791079
}
10801080

10811081
#[test]
10821082
fn test_project_swap() {
10831083
let projection = Some(vec![2, 1]);
1084-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1084+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10851085
assert_eq!(stats, make_stats(vec![30, 20]));
10861086
}
10871087

10881088
#[test]
10891089
fn test_project_repeated() {
10901090
let projection = Some(vec![1, 2, 1, 1, 0, 2]);
1091-
let stats = make_stats(vec![10, 20, 30]).project(projection.as_ref());
1091+
let stats = make_stats(vec![10, 20, 30]).project(projection.as_deref());
10921092
assert_eq!(stats, make_stats(vec![20, 30, 20, 20, 10, 30]));
10931093
}
10941094

datafusion/common/src/utils/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ use std::thread::available_parallelism;
7070
/// ```
7171
pub fn project_schema(
7272
schema: &SchemaRef,
73-
projection: Option<&Vec<usize>>,
73+
projection: Option<&impl AsRef<[usize]>>,
7474
) -> Result<SchemaRef> {
75-
let schema = match projection {
75+
let schema = match projection.map(AsRef::as_ref) {
7676
Some(columns) => Arc::new(schema.project(columns)?),
7777
None => Arc::clone(schema),
7878
};

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -960,7 +960,7 @@ impl DefaultPhysicalPlanner {
960960
// project the output columns excluding the async functions
961961
// The async functions are always appended to the end of the schema.
962962
.apply_projection(Some(
963-
(0..input.schema().fields().len()).collect(),
963+
(0..input.schema().fields().len()).collect::<Vec<_>>(),
964964
))?
965965
.with_batch_size(session_state.config().batch_size())
966966
.build()?

datafusion/core/tests/physical_optimizer/join_selection.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ async fn test_hash_join_swap_on_joins_with_projections(
762762
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
763763
);
764764

765-
assert_eq!(swapped_join.projection, Some(vec![0_usize]));
765+
assert_eq!(swapped_join.projection.as_ref().unwrap(), [0_usize]);
766766
assert_eq!(swapped.schema().fields.len(), 1);
767767
assert_eq!(swapped.schema().fields[0].name(), "small_col");
768768
Ok(())

datafusion/physical-expr/src/projection.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
125125
/// indices.
126126
#[derive(Debug, Clone, PartialEq, Eq)]
127127
pub struct ProjectionExprs {
128-
exprs: Vec<ProjectionExpr>,
128+
exprs: Arc<[ProjectionExpr]>,
129129
}
130130

131131
impl std::fmt::Display for ProjectionExprs {
@@ -137,22 +137,24 @@ impl std::fmt::Display for ProjectionExprs {
137137

138138
impl From<Vec<ProjectionExpr>> for ProjectionExprs {
139139
fn from(value: Vec<ProjectionExpr>) -> Self {
140-
Self { exprs: value }
140+
Self {
141+
exprs: value.into(),
142+
}
141143
}
142144
}
143145

144146
impl From<&[ProjectionExpr]> for ProjectionExprs {
145147
fn from(value: &[ProjectionExpr]) -> Self {
146148
Self {
147-
exprs: value.to_vec(),
149+
exprs: value.iter().cloned().collect(),
148150
}
149151
}
150152
}
151153

152154
impl FromIterator<ProjectionExpr> for ProjectionExprs {
153155
fn from_iter<T: IntoIterator<Item = ProjectionExpr>>(exprs: T) -> Self {
154156
Self {
155-
exprs: exprs.into_iter().collect::<Vec<_>>(),
157+
exprs: exprs.into_iter().collect(),
156158
}
157159
}
158160
}
@@ -164,12 +166,17 @@ impl AsRef<[ProjectionExpr]> for ProjectionExprs {
164166
}
165167

166168
impl ProjectionExprs {
167-
pub fn new<I>(exprs: I) -> Self
168-
where
169-
I: IntoIterator<Item = ProjectionExpr>,
170-
{
169+
/// Make a new [`ProjectionExprs`] from expressions iterator.
170+
pub fn new(exprs: impl IntoIterator<Item = ProjectionExpr>) -> Self {
171+
Self {
172+
exprs: exprs.into_iter().collect(),
173+
}
174+
}
175+
176+
/// Make a new [`ProjectionExprs`] from expressions.
177+
pub fn from_expressions(exprs: impl Into<Arc<[ProjectionExpr]>>) -> Self {
171178
Self {
172-
exprs: exprs.into_iter().collect::<Vec<_>>(),
179+
exprs: exprs.into(),
173180
}
174181
}
175182

@@ -285,13 +292,14 @@ impl ProjectionExprs {
285292
{
286293
let exprs = self
287294
.exprs
288-
.into_iter()
295+
.iter()
296+
.cloned()
289297
.map(|mut proj| {
290298
proj.expr = f(proj.expr)?;
291299
Ok(proj)
292300
})
293-
.collect::<Result<Vec<_>>>()?;
294-
Ok(Self::new(exprs))
301+
.collect::<Result<Arc<_>>>()?;
302+
Ok(Self::from_expressions(exprs))
295303
}
296304

297305
/// Apply another projection on top of this projection, returning the combined projection.
@@ -361,7 +369,7 @@ impl ProjectionExprs {
361369
/// applied on top of this projection.
362370
pub fn try_merge(&self, other: &ProjectionExprs) -> Result<ProjectionExprs> {
363371
let mut new_exprs = Vec::with_capacity(other.exprs.len());
364-
for proj_expr in &other.exprs {
372+
for proj_expr in other.exprs.iter() {
365373
let new_expr = update_expr(&proj_expr.expr, &self.exprs, true)?
366374
.ok_or_else(|| {
367375
internal_datafusion_err!(
@@ -607,7 +615,7 @@ impl ProjectionExprs {
607615
) -> Result<datafusion_common::Statistics> {
608616
let mut column_statistics = vec![];
609617

610-
for proj_expr in &self.exprs {
618+
for proj_expr in self.exprs.iter() {
611619
let expr = &proj_expr.expr;
612620
let col_stats = if let Some(col) = expr.as_any().downcast_ref::<Column>() {
613621
std::mem::take(&mut stats.column_statistics[col.index()])
@@ -754,15 +762,6 @@ impl Projector {
754762
}
755763
}
756764

757-
impl IntoIterator for ProjectionExprs {
758-
type Item = ProjectionExpr;
759-
type IntoIter = std::vec::IntoIter<ProjectionExpr>;
760-
761-
fn into_iter(self) -> Self::IntoIter {
762-
self.exprs.into_iter()
763-
}
764-
}
765-
766765
/// The function operates in two modes:
767766
///
768767
/// 1) When `sync_with_child` is `true`:

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ fn handle_hash_join(
723723
.collect();
724724

725725
let column_indices = build_join_column_index(plan);
726-
let projected_indices: Vec<_> = if let Some(projection) = &plan.projection {
726+
let projected_indices: Vec<_> = if let Some(projection) = plan.projection.as_ref() {
727727
projection.iter().map(|&i| &column_indices[i]).collect()
728728
} else {
729729
column_indices.iter().collect()

datafusion/physical-optimizer/src/projection_pushdown.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ fn try_push_down_join_filter(
135135
);
136136

137137
let new_lhs_length = lhs_rewrite.data.0.schema().fields.len();
138-
let projections = match projections {
138+
let projections = match projections.as_ref() {
139139
None => match join.join_type() {
140140
JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {
141141
// Build projections that ignore the newly projected columns.

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -544,11 +544,11 @@ pub struct AggregateExec {
544544
/// Aggregation mode (full, partial)
545545
mode: AggregateMode,
546546
/// Group by expressions
547-
group_by: PhysicalGroupBy,
547+
group_by: Arc<PhysicalGroupBy>,
548548
/// Aggregate expressions
549-
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
549+
aggr_expr: Arc<[Arc<AggregateFunctionExpr>]>,
550550
/// FILTER (WHERE clause) expression for each aggregate expression
551-
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
551+
filter_expr: Arc<[Option<Arc<dyn PhysicalExpr>>]>,
552552
/// Configuration for limit-based optimizations
553553
limit_options: Option<LimitOptions>,
554554
/// Input plan, could be a partial aggregate or the input to the aggregate
@@ -582,18 +582,18 @@ impl AggregateExec {
582582
/// Rewrites aggregate exec with new aggregate expressions.
583583
pub fn with_new_aggr_exprs(
584584
&self,
585-
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
585+
aggr_expr: impl Into<Arc<[Arc<AggregateFunctionExpr>]>>,
586586
) -> Self {
587587
Self {
588-
aggr_expr,
588+
aggr_expr: aggr_expr.into(),
589589
// clone the rest of the fields
590590
required_input_ordering: self.required_input_ordering.clone(),
591591
metrics: ExecutionPlanMetricsSet::new(),
592592
input_order_mode: self.input_order_mode.clone(),
593593
cache: self.cache.clone(),
594594
mode: self.mode,
595-
group_by: self.group_by.clone(),
596-
filter_expr: self.filter_expr.clone(),
595+
group_by: Arc::clone(&self.group_by),
596+
filter_expr: Arc::clone(&self.filter_expr),
597597
limit_options: self.limit_options,
598598
input: Arc::clone(&self.input),
599599
schema: Arc::clone(&self.schema),
@@ -612,9 +612,9 @@ impl AggregateExec {
612612
input_order_mode: self.input_order_mode.clone(),
613613
cache: self.cache.clone(),
614614
mode: self.mode,
615-
group_by: self.group_by.clone(),
616-
aggr_expr: self.aggr_expr.clone(),
617-
filter_expr: self.filter_expr.clone(),
615+
group_by: Arc::clone(&self.group_by),
616+
aggr_expr: Arc::clone(&self.aggr_expr),
617+
filter_expr: Arc::clone(&self.filter_expr),
618618
input: Arc::clone(&self.input),
619619
schema: Arc::clone(&self.schema),
620620
input_schema: Arc::clone(&self.input_schema),
@@ -629,12 +629,13 @@ impl AggregateExec {
629629
/// Create a new hash aggregate execution plan
630630
pub fn try_new(
631631
mode: AggregateMode,
632-
group_by: PhysicalGroupBy,
632+
group_by: impl Into<Arc<PhysicalGroupBy>>,
633633
aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
634634
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
635635
input: Arc<dyn ExecutionPlan>,
636636
input_schema: SchemaRef,
637637
) -> Result<Self> {
638+
let group_by = group_by.into();
638639
let schema = create_schema(&input.schema(), &group_by, &aggr_expr, mode)?;
639640

640641
let schema = Arc::new(schema);
@@ -659,13 +660,16 @@ impl AggregateExec {
659660
/// the schema in such cases.
660661
fn try_new_with_schema(
661662
mode: AggregateMode,
662-
group_by: PhysicalGroupBy,
663+
group_by: impl Into<Arc<PhysicalGroupBy>>,
663664
mut aggr_expr: Vec<Arc<AggregateFunctionExpr>>,
664-
filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>>,
665+
filter_expr: impl Into<Arc<[Option<Arc<dyn PhysicalExpr>>]>>,
665666
input: Arc<dyn ExecutionPlan>,
666667
input_schema: SchemaRef,
667668
schema: SchemaRef,
668669
) -> Result<Self> {
670+
let group_by = group_by.into();
671+
let filter_expr = filter_expr.into();
672+
669673
// Make sure arguments are consistent in size
670674
assert_eq_or_internal_err!(
671675
aggr_expr.len(),
@@ -732,13 +736,13 @@ impl AggregateExec {
732736
&group_expr_mapping,
733737
&mode,
734738
&input_order_mode,
735-
aggr_expr.as_slice(),
739+
aggr_expr.as_ref(),
736740
)?;
737741

738742
let mut exec = AggregateExec {
739743
mode,
740744
group_by,
741-
aggr_expr,
745+
aggr_expr: aggr_expr.into(),
742746
filter_expr,
743747
input,
744748
schema,
@@ -1287,9 +1291,9 @@ impl ExecutionPlan for AggregateExec {
12871291
) -> Result<Arc<dyn ExecutionPlan>> {
12881292
let mut me = AggregateExec::try_new_with_schema(
12891293
self.mode,
1290-
self.group_by.clone(),
1291-
self.aggr_expr.clone(),
1292-
self.filter_expr.clone(),
1294+
Arc::clone(&self.group_by),
1295+
self.aggr_expr.to_vec(),
1296+
Arc::clone(&self.filter_expr),
12931297
Arc::clone(&children[0]),
12941298
Arc::clone(&self.input_schema),
12951299
Arc::clone(&self.schema),

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ struct AggregateStreamInner {
6161
mode: AggregateMode,
6262
input: SendableRecordBatchStream,
6363
aggregate_expressions: Vec<Vec<Arc<dyn PhysicalExpr>>>,
64-
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
64+
filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>,
6565

6666
// ==== Runtime States/Buffers ====
6767
accumulators: Vec<AccumulatorItem>,
@@ -276,7 +276,7 @@ impl AggregateStream {
276276
partition: usize,
277277
) -> Result<Self> {
278278
let agg_schema = Arc::clone(&agg.schema);
279-
let agg_filter_expr = agg.filter_expr.clone();
279+
let agg_filter_expr = Arc::clone(&agg.filter_expr);
280280

281281
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
282282
let input = agg.input.execute(partition, Arc::clone(context))?;
@@ -287,7 +287,7 @@ impl AggregateStream {
287287
| AggregateMode::Single
288288
| AggregateMode::SinglePartitioned => agg_filter_expr,
289289
AggregateMode::Final | AggregateMode::FinalPartitioned => {
290-
vec![None; agg.aggr_expr.len()]
290+
vec![None; agg.aggr_expr.len()].into()
291291
}
292292
};
293293
let accumulators = create_accumulators(&agg.aggr_expr)?;

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -377,10 +377,10 @@ pub(crate) struct GroupedHashAggregateStream {
377377
///
378378
/// For example, for an aggregate like `SUM(x) FILTER (WHERE x >= 100)`,
379379
/// the filter expression is `x > 100`.
380-
filter_expressions: Vec<Option<Arc<dyn PhysicalExpr>>>,
380+
filter_expressions: Arc<[Option<Arc<dyn PhysicalExpr>>]>,
381381

382382
/// GROUP BY expressions
383-
group_by: PhysicalGroupBy,
383+
group_by: Arc<PhysicalGroupBy>,
384384

385385
/// max rows in output RecordBatches
386386
batch_size: usize,
@@ -465,8 +465,8 @@ impl GroupedHashAggregateStream {
465465
) -> Result<Self> {
466466
debug!("Creating GroupedHashAggregateStream");
467467
let agg_schema = Arc::clone(&agg.schema);
468-
let agg_group_by = agg.group_by.clone();
469-
let agg_filter_expr = agg.filter_expr.clone();
468+
let agg_group_by = Arc::clone(&agg.group_by);
469+
let agg_filter_expr = Arc::clone(&agg.filter_expr);
470470

471471
let batch_size = context.session_config().batch_size();
472472
let input = agg.input.execute(partition, Arc::clone(context))?;
@@ -475,7 +475,7 @@ impl GroupedHashAggregateStream {
475475

476476
let timer = baseline_metrics.elapsed_compute().timer();
477477

478-
let aggregate_exprs = agg.aggr_expr.clone();
478+
let aggregate_exprs = Arc::clone(&agg.aggr_expr);
479479

480480
// arguments for each aggregate, one vec of expressions per
481481
// aggregate
@@ -496,7 +496,7 @@ impl GroupedHashAggregateStream {
496496
| AggregateMode::Single
497497
| AggregateMode::SinglePartitioned => agg_filter_expr,
498498
AggregateMode::Final | AggregateMode::FinalPartitioned => {
499-
vec![None; agg.aggr_expr.len()]
499+
vec![None; agg.aggr_expr.len()].into()
500500
}
501501
};
502502

0 commit comments

Comments
 (0)