diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index c687b135d558a..156177fc6eba5 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -1504,21 +1504,21 @@ impl ClusterSendExec { } } - pub fn worker_plans(&self) -> Vec<(String, PreSerializedPlan)> { + pub fn worker_plans(&self) -> Result, CubeError> { let mut res = Vec::new(); for (node_name, partitions) in self.partitions.iter() { res.push(( node_name.clone(), - self.serialized_plan_for_partitions(partitions), + self.serialized_plan_for_partitions(partitions)?, )); } - res + Ok(res) } fn serialized_plan_for_partitions( &self, partitions: &(Vec<(u64, RowRange)>, Vec), - ) -> PreSerializedPlan { + ) -> Result { let (partitions, inline_table_ids) = partitions; let mut ps = HashMap::<_, RowFilter>::new(); for (id, range) in partitions { @@ -1577,7 +1577,7 @@ impl ExecutionPlan for ClusterSendExec { ) -> Result { let (node_name, partitions) = &self.partitions[partition]; - let plan = self.serialized_plan_for_partitions(partitions); + let plan = self.serialized_plan_for_partitions(partitions)?; let cluster = self.cluster.clone(); let schema = self.properties.eq_properties.schema().clone(); diff --git a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs index bca4ed6d089e7..c4feecab4942f 100644 --- a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs +++ b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs @@ -11,7 +11,7 @@ use crate::queryplanner::udfs::aggregate_udf_by_kind; use crate::queryplanner::udfs::{ aggregate_kind_by_name, scalar_udf_by_kind, CubeAggregateUDFKind, CubeScalarUDFKind, }; -use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider}; +use crate::queryplanner::{pretty_printers, CubeTableLogical, InfoSchemaTableProvider}; use crate::table::Row; use crate::CubeError; use datafusion::arrow::datatypes::{DataType, SchemaRef}; @@ -29,7 +29,7 @@ use datafusion::common::{Column, DFSchemaRef, JoinConstraint, JoinType}; use datafusion::datasource::physical_plan::ParquetFileReaderFactory; use datafusion::datasource::DefaultTableSource; use datafusion::error::DataFusionError; -use datafusion::logical_expr::{Expr, Extension, LogicalPlan, TableScan}; +use datafusion::logical_expr::{Aggregate, CrossJoin, EmptyRelation, Expr, Extension, Filter, Join, Limit, LogicalPlan, Projection, Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union}; use datafusion::prelude::SessionContext; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, @@ -504,375 +504,408 @@ pub struct WorkerContext { // }, // }) // } -// fn is_empty_relation(&self) -> Option { -// match self { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row, -// schema, -// } => { -// if !produce_one_row { -// Some(schema.clone()) -// } else { -// None -// } -// } -// _ => None, -// } -// } -// -// fn remove_unused_tables( -// &self, -// partition_ids_to_execute: &Vec<(u64, RowFilter)>, -// inline_tables_to_execute: &Vec, -// ) -> SerializedLogicalPlan { -// debug_assert!(partition_ids_to_execute -// .iter() -// .is_sorted_by_key(|(id, _)| id)); -// match self { -// SerializedLogicalPlan::Projection { -// expr, -// input, -// schema, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// if input.is_empty_relation().is_some() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Projection { -// expr: expr.clone(), -// input: Arc::new(input), -// schema: schema.clone(), -// } -// } -// } -// SerializedLogicalPlan::Filter { predicate, input } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if let Some(schema) = input.is_empty_relation() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Filter { -// predicate: predicate.clone(), -// input: Arc::new(input), -// } -// } -// } -// SerializedLogicalPlan::Aggregate { -// input, -// group_expr, -// aggr_expr, -// schema, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// SerializedLogicalPlan::Aggregate { -// input: Arc::new(input), -// group_expr: group_expr.clone(), -// aggr_expr: aggr_expr.clone(), -// schema: schema.clone(), -// } -// } -// SerializedLogicalPlan::Sort { expr, input } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if let Some(schema) = input.is_empty_relation() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Sort { -// expr: expr.clone(), -// input: Arc::new(input), -// } -// } -// } -// SerializedLogicalPlan::Union { -// inputs, -// schema, -// alias, -// } => { -// let inputs = inputs -// .iter() -// .filter_map(|i| { -// let i = i.remove_unused_tables( -// partition_ids_to_execute, -// inline_tables_to_execute, -// ); -// if i.is_empty_relation().is_some() { -// None -// } else { -// Some(Arc::new(i)) -// } -// }) -// .collect::>(); -// -// if inputs.is_empty() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Union { -// inputs, -// schema: schema.clone(), -// alias: alias.clone(), -// } -// } -// } -// SerializedLogicalPlan::TableScan { -// table_name, -// source, -// projection, -// projected_schema, -// filters, -// alias, -// limit, -// } => { -// let is_empty = match source { -// SerializedTableSource::CubeTable(table) => { -// !table.has_partitions(partition_ids_to_execute) -// } -// SerializedTableSource::InlineTable(table) => { -// !table.has_inline_table_id(inline_tables_to_execute) -// } -// }; -// if is_empty { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: projected_schema.clone(), -// } -// } else { -// SerializedLogicalPlan::TableScan { -// table_name: table_name.clone(), -// source: source.clone(), -// projection: projection.clone(), -// projected_schema: projected_schema.clone(), -// filters: filters.clone(), -// alias: alias.clone(), -// limit: limit.clone(), -// } -// } -// } -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row, -// schema, -// } => SerializedLogicalPlan::EmptyRelation { -// produce_one_row: *produce_one_row, -// schema: schema.clone(), -// }, -// SerializedLogicalPlan::Limit { n, input } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if let Some(schema) = input.is_empty_relation() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Limit { -// n: *n, -// input: Arc::new(input), -// } -// } -// } -// SerializedLogicalPlan::Skip { n, input } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if let Some(schema) = input.is_empty_relation() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Skip { -// n: *n, -// input: Arc::new(input), -// } -// } -// } -// SerializedLogicalPlan::Join { -// left, -// right, -// on, -// join_type, -// join_constraint, -// schema, -// } => { -// let left = -// left.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// let right = -// right.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// SerializedLogicalPlan::Join { -// left: Arc::new(left), -// right: Arc::new(right), -// on: on.clone(), -// join_type: join_type.clone(), -// join_constraint: *join_constraint, -// schema: schema.clone(), -// } -// } -// SerializedLogicalPlan::Repartition { -// input, -// partitioning_scheme, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if let Some(schema) = input.is_empty_relation() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Repartition { -// input: Arc::new(input), -// partitioning_scheme: partitioning_scheme.clone(), -// } -// } -// } -// SerializedLogicalPlan::Alias { -// input, -// alias, -// schema, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// if input.is_empty_relation().is_some() { -// SerializedLogicalPlan::EmptyRelation { -// produce_one_row: false, -// schema: schema.clone(), -// } -// } else { -// SerializedLogicalPlan::Alias { -// input: Arc::new(input), -// alias: alias.clone(), -// schema: schema.clone(), -// } -// } -// } -// SerializedLogicalPlan::ClusterSend { -// input, -// snapshots, -// limit_and_reverse, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// SerializedLogicalPlan::ClusterSend { -// input: Arc::new(input), -// snapshots: snapshots.clone(), -// limit_and_reverse: limit_and_reverse.clone(), -// } -// } -// SerializedLogicalPlan::ClusterAggregateTopK { -// limit, -// input, -// group_expr, -// aggregate_expr, -// sort_columns, -// having_expr, -// schema, -// snapshots, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// SerializedLogicalPlan::ClusterAggregateTopK { -// limit: *limit, -// input: Arc::new(input), -// group_expr: group_expr.clone(), -// aggregate_expr: aggregate_expr.clone(), -// sort_columns: sort_columns.clone(), -// having_expr: having_expr.clone(), -// schema: schema.clone(), -// snapshots: snapshots.clone(), -// } -// } -// SerializedLogicalPlan::CrossJoin { -// left, -// right, -// on, -// join_schema, -// } => { -// let left = -// left.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// let right = -// right.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// SerializedLogicalPlan::CrossJoin { -// left: Arc::new(left), -// right: Arc::new(right), -// on: on.clone(), -// join_schema: join_schema.clone(), -// } -// } -// SerializedLogicalPlan::CrossJoinAgg { -// left, -// right, -// on, -// join_schema, -// group_expr, -// agg_expr, -// schema, -// } => { -// let left = -// left.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// let right = -// right.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// -// SerializedLogicalPlan::CrossJoinAgg { -// left: Arc::new(left), -// right: Arc::new(right), -// on: on.clone(), -// join_schema: join_schema.clone(), -// group_expr: group_expr.clone(), -// agg_expr: agg_expr.clone(), -// schema: schema.clone(), -// } -// } -// SerializedLogicalPlan::RollingWindowAgg { -// schema, -// input, -// dimension, -// partition_by, -// from, -// to, -// every, -// rolling_aggs, -// group_by_dimension, -// aggs, -// } => { -// let input = -// input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); -// SerializedLogicalPlan::RollingWindowAgg { -// schema: schema.clone(), -// input: Arc::new(input), -// dimension: dimension.clone(), -// partition_by: partition_by.clone(), -// from: from.clone(), -// to: to.clone(), -// every: every.clone(), -// rolling_aggs: rolling_aggs.clone(), -// group_by_dimension: group_by_dimension.clone(), -// aggs: aggs.clone(), -// } -// } -// SerializedLogicalPlan::Panic {} => SerializedLogicalPlan::Panic {}, -// } -// } -// } + +fn is_empty_relation(plan: &LogicalPlan) -> Option { + match plan { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema, + }) => { + if !produce_one_row { + Some(schema.clone()) + } else { + None + } + } + _ => None, + } +} + +impl PreSerializedPlan { + fn remove_unused_tables( + plan: &LogicalPlan, + partition_ids_to_execute: &Vec<(u64, RowFilter)>, + inline_tables_to_execute: &Vec, + ) -> Result { + debug_assert!(partition_ids_to_execute + .iter() + .is_sorted_by_key(|(id, _)| id)); + let res = match plan { + LogicalPlan::Projection(Projection { + expr, + input, + schema, + .. + }) => { + let input = + PreSerializedPlan::remove_unused_tables(&input, partition_ids_to_execute, inline_tables_to_execute)?; + if is_empty_relation(&input).is_some() { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Projection(Projection::try_new_with_schema( + expr.clone(), + Arc::new(input), + schema.clone(), + )?) + } + } + LogicalPlan::Filter(Filter { predicate, input, having, .. }) => { + let input = + PreSerializedPlan::remove_unused_tables(&input, partition_ids_to_execute, inline_tables_to_execute)?; + + if let Some(schema) = is_empty_relation(&input) { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Filter(if *having { + Filter::try_new_with_having( + predicate.clone(), + Arc::new(input), + ) + } else { + Filter::try_new( + predicate.clone(), + Arc::new(input), + ) + }?) + } + } + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema, + .. + }) => { + let input = + PreSerializedPlan::remove_unused_tables(&input, partition_ids_to_execute, inline_tables_to_execute)?; + LogicalPlan::Aggregate(Aggregate::try_new_with_schema( + Arc::new(input), + group_expr.clone(), + aggr_expr.clone(), + schema.clone(), + )?) + } + LogicalPlan::Sort(Sort { expr, input, fetch }) => { + let input = + PreSerializedPlan::remove_unused_tables(&input, partition_ids_to_execute, inline_tables_to_execute)?; + + if let Some(schema) = is_empty_relation(&input) { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Sort(Sort { + expr: expr.clone(), + input: Arc::new(input), + fetch: *fetch, + }) + } + } + LogicalPlan::Union(Union { + inputs, + schema, + }) => { + let mut new_inputs: Vec> = Vec::with_capacity(inputs.len()); + for input in inputs { + let i = PreSerializedPlan::remove_unused_tables( + &input, + partition_ids_to_execute, + inline_tables_to_execute, + )?; + if !is_empty_relation(&i).is_some() { + new_inputs.push(Arc::new(i)); + } + } + + if new_inputs.is_empty() { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Union(Union { + inputs: new_inputs, + schema: schema.clone(), + }) + } + } + LogicalPlan::TableScan(TableScan { + table_name, + source, + projection, + projected_schema, + filters, + fetch, + }) => { + // TODO upgrade DF + let is_empty = false; + // let is_empty = match source { + // SerializedTableSource::CubeTable(table) => { + // !table.has_partitions(partition_ids_to_execute) + // } + // SerializedTableSource::InlineTable(table) => { + // !table.has_inline_table_id(inline_tables_to_execute) + // } + // }; + if is_empty { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: projected_schema.clone(), + }) + } else { + LogicalPlan::TableScan(TableScan { + table_name: table_name.clone(), + source: source.clone(), + projection: projection.clone(), + projected_schema: projected_schema.clone(), + filters: filters.clone(), + fetch: *fetch, + }) + } + } + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row, + schema, + }) => LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: *produce_one_row, + schema: schema.clone(), + }), + LogicalPlan::Limit(Limit { skip, fetch, input }) => { + let input = + PreSerializedPlan::remove_unused_tables(input, partition_ids_to_execute, inline_tables_to_execute)?; + + if let Some(schema) = is_empty_relation(&input) { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Limit(Limit { + skip: *skip, + fetch: *fetch, + input: Arc::new(input), + }) + } + } + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type, + join_constraint, + schema, + null_equals_null, + }) => { + let left = + PreSerializedPlan::remove_unused_tables(left, partition_ids_to_execute, inline_tables_to_execute)?; + let right = + PreSerializedPlan::remove_unused_tables(right, partition_ids_to_execute, inline_tables_to_execute)?; + + LogicalPlan::Join(Join { + left: Arc::new(left), + right: Arc::new(right), + on: on.clone(), + filter: filter.clone(), + join_type: join_type.clone(), + join_constraint: *join_constraint, + schema: schema.clone(), + null_equals_null: *null_equals_null, + }) + } + LogicalPlan::Repartition(Repartition { + input, + partitioning_scheme, + }) => { + let input = + PreSerializedPlan::remove_unused_tables(input, partition_ids_to_execute, inline_tables_to_execute)?; + + if let Some(schema) = is_empty_relation(&input) { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::Repartition(Repartition { + input: Arc::new(input), + partitioning_scheme: partitioning_scheme.clone(), + }) + } + } + LogicalPlan::Subquery(Subquery { + subquery, + outer_ref_columns, + .. + }) => { + let subquery: LogicalPlan = + PreSerializedPlan::remove_unused_tables(subquery, partition_ids_to_execute, inline_tables_to_execute)?; + + if let Some(schema) = is_empty_relation(&subquery) { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: subquery.schema().clone(), + }) + } else { + LogicalPlan::Subquery(Subquery { + subquery: Arc::new(subquery), + outer_ref_columns: outer_ref_columns.clone(), + }) + } + } + LogicalPlan::SubqueryAlias(SubqueryAlias { + input, + alias, + schema, + .. + }) => { + let input = + PreSerializedPlan::remove_unused_tables(input, partition_ids_to_execute, inline_tables_to_execute)?; + + if is_empty_relation(&input).is_some() { + LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: schema.clone(), + }) + } else { + LogicalPlan::SubqueryAlias(SubqueryAlias::try_new( + Arc::new(input), + alias.clone(), + )?) + } + } + LogicalPlan::CrossJoin(CrossJoin { + left, + right, + schema, + }) => { + let left = + PreSerializedPlan::remove_unused_tables(left, partition_ids_to_execute, inline_tables_to_execute)?; + let right = + PreSerializedPlan::remove_unused_tables(right, partition_ids_to_execute, inline_tables_to_execute)?; + + LogicalPlan::CrossJoin(CrossJoin { + left: Arc::new(left), + right: Arc::new(right), + schema: schema.clone(), + }) + } + LogicalPlan::Extension(Extension { + node + }) => { + if let Some(cluster_send) = node.as_any().downcast_ref::() { + let ClusterSendNode { input, snapshots, limit_and_reverse } = cluster_send; + let input = PreSerializedPlan::remove_unused_tables(&input, partition_ids_to_execute, inline_tables_to_execute)?; + LogicalPlan::Extension(Extension { + node: Arc::new(ClusterSendNode { + input: Arc::new(input), + snapshots: snapshots.clone(), + limit_and_reverse: *limit_and_reverse, + }) + }) + } else if let Some(panic_worker) = node.as_any().downcast_ref::() { + let PanicWorkerNode{} = panic_worker; // (No fields to recurse; just clone the existing Arc `node`.) + LogicalPlan::Extension(Extension { + node: node.clone(), + }) + } else if let Some(cluster_agg_topk) = node.as_any().downcast_ref::() { + let ClusterAggregateTopK { + limit, + input, + group_expr, + aggregate_expr, + order_by, + having_expr, + schema, + snapshots, + } = cluster_agg_topk; + let input = PreSerializedPlan::remove_unused_tables(input, partition_ids_to_execute, inline_tables_to_execute)?; + LogicalPlan::Extension(Extension { + node: Arc::new(ClusterAggregateTopK { + limit: *limit, + input: Arc::new(input), + group_expr: group_expr.clone(), + aggregate_expr: aggregate_expr.clone(), + order_by: order_by.clone(), + having_expr: having_expr.clone(), + schema: schema.clone(), + snapshots: snapshots.clone(), + }), + }) + } else { + // TODO upgrade DF + todo!("remove_unused_tables not handling Extension case: {:?}", node); + } + } + LogicalPlan::Window(_) | LogicalPlan::Values(_) | LogicalPlan::Distinct(_) | + LogicalPlan::RecursiveQuery(_) | LogicalPlan::Explain(_) | + LogicalPlan::Statement(_) | LogicalPlan::Analyze(_) | LogicalPlan::Prepare(_) | + LogicalPlan::Dml(_) | LogicalPlan::Ddl(_) | LogicalPlan::Copy(_) | LogicalPlan::DescribeTable(_) | + LogicalPlan::Unnest(_) => { + todo!("remove_unused_tables not handling case: {}", pretty_printers::pp_plan(plan)); + } + // TODO upgrade DF + // SerializedLogicalPlan::CrossJoinAgg { + // left, + // right, + // on, + // join_schema, + // group_expr, + // agg_expr, + // schema, + // } => { + // let left = + // left.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); + // let right = + // right.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); + + // SerializedLogicalPlan::CrossJoinAgg { + // left: Arc::new(left), + // right: Arc::new(right), + // on: on.clone(), + // join_schema: join_schema.clone(), + // group_expr: group_expr.clone(), + // agg_expr: agg_expr.clone(), + // schema: schema.clone(), + // } + // } + // SerializedLogicalPlan::RollingWindowAgg { + // schema, + // input, + // dimension, + // partition_by, + // from, + // to, + // every, + // rolling_aggs, + // group_by_dimension, + // aggs, + // } => { + // let input = + // input.remove_unused_tables(partition_ids_to_execute, inline_tables_to_execute); + // SerializedLogicalPlan::RollingWindowAgg { + // schema: schema.clone(), + // input: Arc::new(input), + // dimension: dimension.clone(), + // partition_by: partition_by.clone(), + // from: from.clone(), + // to: to.clone(), + // every: every.clone(), + // rolling_aggs: rolling_aggs.clone(), + // group_by_dimension: group_by_dimension.clone(), + // aggs: aggs.clone(), + // } + // } + }; + Ok(res) + } +} // TODO upgrade DF // #[derive(Clone, Serialize, Deserialize, Debug)] @@ -1098,19 +1131,19 @@ impl PreSerializedPlan { &self, partition_ids_to_execute: Vec<(u64, RowFilter)>, inline_table_ids_to_execute: Vec, - ) -> Self { - Self { - // TODO upgrade DF - // logical_plan: Arc::new( - // self.logical_plan - // .remove_unused_tables(&partition_ids_to_execute, &inline_table_ids_to_execute), - // ), - logical_plan: self.logical_plan.clone(), + ) -> Result { + let logical_plan = PreSerializedPlan::remove_unused_tables( + &self.logical_plan, + &partition_ids_to_execute, + &inline_table_ids_to_execute, + )?; + Ok(Self { + logical_plan, schema_snapshot: self.schema_snapshot.clone(), partition_ids_to_execute, inline_table_ids_to_execute, trace_obj: self.trace_obj.clone(), - } + }) } /// Note: avoid during normal execution, workers must filter the partitions they execute. diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 100d1ef346fe9..5129dbe7b44a7 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -382,17 +382,17 @@ impl SqlServiceImpl { ) -> Result, CubeError> { fn extract_worker_plans( p: &Arc, - ) -> Option> { + ) -> Result>, CubeError> { if let Some(p) = p.as_any().downcast_ref::() { - Some(p.worker_plans()) + Ok(Some(p.worker_plans()?)) } else { for c in p.children() { - let res = extract_worker_plans(&c); + let res = extract_worker_plans(&c)?; if res.is_some() { - return res; + return Ok(res); } } - None + Ok(None) } } @@ -434,7 +434,7 @@ impl SqlServiceImpl { TableValue::String(pp_phys_plan(router_plan.as_ref())), ])); - if let Some(worker_plans) = extract_worker_plans(&router_plan) { + if let Some(worker_plans) = extract_worker_plans(&router_plan)? { let worker_futures = worker_plans .into_iter() .map(|(name, plan)| async move { @@ -1166,7 +1166,7 @@ impl SqlService for SqlServiceImpl { }) .collect(), context.inline_tables.into_iter().map(|i| i.id).collect(), - ); + )?; let worker_plan: SerializedPlan = worker_plan.to_serialized_plan()?; let mut mocked_names = HashMap::new(); for (_, f, _, _) in worker_plan.files_to_download() {