From 7956b2a0b264076c75c8d9e1ef4802021ab80bc1 Mon Sep 17 00:00:00 2001 From: Sam Hughes Date: Mon, 16 Dec 2024 12:09:18 -0800 Subject: [PATCH] chore(cubestore): Upgrade DF: Split SerializedPlan type into PreSerializedPlan --- .../cubestore/src/queryplanner/mod.rs | 5 +- .../src/queryplanner/optimizations/mod.rs | 8 +- .../cubestore/src/queryplanner/planning.rs | 4 +- .../src/queryplanner/query_executor.rs | 37 ++-- .../src/queryplanner/serialized_plan.rs | 207 +++++++++++++----- rust/cubestore/cubestore/src/sql/mod.rs | 21 +- 6 files changed, 186 insertions(+), 96 deletions(-) diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index 5e495ded01599..23f946d7b9734 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -13,6 +13,7 @@ pub mod serialized_plan; mod tail_limit; mod topk; pub mod trace_data_loaded; +use serialized_plan::PreSerializedPlan; pub use topk::MIN_TOPK_STREAM_ROWS; use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs}; mod filter_by_key_range; @@ -122,7 +123,7 @@ crate::di_service!(QueryPlannerImpl, [QueryPlanner]); pub enum QueryPlan { Meta(LogicalPlan), - Select(SerializedPlan, /*workers*/ Vec), + Select(PreSerializedPlan, /*workers*/ Vec), } #[async_trait] @@ -191,7 +192,7 @@ impl QueryPlanner for QueryPlannerImpl { &meta.multi_part_subtree, )?; QueryPlan::Select( - SerializedPlan::try_new(logical_plan, meta, trace_obj).await?, + PreSerializedPlan::try_new(logical_plan, meta, trace_obj)?, workers, ) } else { diff --git a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs index 536af44182973..4ba8f2da8c832 100644 --- a/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs @@ -30,9 +30,11 @@ use rewrite_plan::rewrite_physical_plan; use std::sync::Arc; use trace_data_loaded::add_trace_data_loaded_exec; +use super::serialized_plan::PreSerializedPlan; + pub struct CubeQueryPlanner { cluster: Option>, - serialized_plan: Arc, + serialized_plan: Arc, memory_handler: Arc, data_loaded_size: Option>, } @@ -40,7 +42,7 @@ pub struct CubeQueryPlanner { impl CubeQueryPlanner { pub fn new_on_router( cluster: Arc, - serialized_plan: Arc, + serialized_plan: Arc, memory_handler: Arc, ) -> CubeQueryPlanner { CubeQueryPlanner { @@ -52,7 +54,7 @@ impl CubeQueryPlanner { } pub fn new_on_worker( - serialized_plan: Arc, + serialized_plan: Arc, memory_handler: Arc, data_loaded_size: Option>, ) -> CubeQueryPlanner { diff --git a/rust/cubestore/cubestore/src/queryplanner/planning.rs b/rust/cubestore/cubestore/src/queryplanner/planning.rs index e599faf7f2d84..bc5b33b52cd50 100644 --- a/rust/cubestore/cubestore/src/queryplanner/planning.rs +++ b/rust/cubestore/cubestore/src/queryplanner/planning.rs @@ -72,6 +72,8 @@ use std::cmp::Ordering; use std::hash::{Hash, Hasher}; use std::iter::FromIterator; +use super::serialized_plan::PreSerializedPlan; + #[cfg(test)] pub async fn choose_index( p: LogicalPlan, @@ -1585,7 +1587,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result>, - pub serialized_plan: Arc, + pub serialized_plan: Arc, } #[async_trait] diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 0b18df8f4482f..c687b135d558a 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -93,6 +93,7 @@ use std::sync::Arc; use std::time::SystemTime; use tracing::{instrument, Instrument}; +use super::serialized_plan::PreSerializedPlan; use super::udfs::{ aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs, CubeAggregateUDFKind, @@ -287,19 +288,19 @@ impl QueryExecutor for QueryExecutorImpl { plan: SerializedPlan, cluster: Arc, ) -> Result<(Arc, LogicalPlan), CubeError> { - let plan_to_move = plan.logical_plan( + let pre_serialized_plan = plan.to_pre_serialized( HashMap::new(), HashMap::new(), NoopParquetMetadataCache::new(), )?; - let serialized_plan = Arc::new(plan); - let ctx = self.router_context(cluster.clone(), serialized_plan.clone())?; + let pre_serialized_plan = Arc::new(pre_serialized_plan); + let ctx = self.router_context(cluster.clone(), pre_serialized_plan.clone())?; Ok(( ctx.clone() .state() - .create_physical_plan(&plan_to_move.clone()) + .create_physical_plan(pre_serialized_plan.logical_plan()) .await?, - plan_to_move, + pre_serialized_plan.logical_plan().clone(), )) } @@ -310,20 +311,20 @@ impl QueryExecutor for QueryExecutorImpl { chunk_id_to_record_batches: HashMap>, data_loaded_size: Option>, ) -> Result<(Arc, LogicalPlan), CubeError> { - let plan_to_move = plan.logical_plan( + let pre_serialized_plan = plan.to_pre_serialized( remote_to_local_names, chunk_id_to_record_batches, self.parquet_metadata_cache.cache().clone(), )?; - let plan = Arc::new(plan); - let ctx = self.worker_context(plan.clone(), data_loaded_size)?; + let pre_serialized_plan = Arc::new(pre_serialized_plan); + let ctx = self.worker_context(pre_serialized_plan.clone(), data_loaded_size)?; let plan_ctx = ctx.clone(); Ok(( plan_ctx .state() - .create_physical_plan(&plan_to_move.clone()) + .create_physical_plan(pre_serialized_plan.logical_plan()) .await?, - plan_to_move, + pre_serialized_plan.logical_plan().clone(), )) } @@ -372,7 +373,7 @@ impl QueryExecutorImpl { fn router_context( &self, cluster: Arc, - serialized_plan: Arc, + serialized_plan: Arc, ) -> Result, CubeError> { let runtime = Arc::new(RuntimeEnv::default()); let config = Self::session_config(); @@ -424,7 +425,7 @@ impl QueryExecutorImpl { fn worker_context( &self, - serialized_plan: Arc, + serialized_plan: Arc, data_loaded_size: Option>, ) -> Result, CubeError> { let runtime = Arc::new(RuntimeEnv::default()); @@ -1229,7 +1230,7 @@ pub struct ClusterSendExec { /// Never executed, only stored to allow consistent optimization on router and worker. pub input_for_optimizations: Arc, pub cluster: Arc, - pub serialized_plan: Arc, + pub serialized_plan: Arc, pub use_streaming: bool, } @@ -1248,7 +1249,7 @@ pub enum InlineCompoundPartition { impl ClusterSendExec { pub fn new( cluster: Arc, - serialized_plan: Arc, + serialized_plan: Arc, union_snapshots: &[Snapshots], input_for_optimizations: Arc, use_streaming: bool, @@ -1503,7 +1504,7 @@ impl ClusterSendExec { } } - pub fn worker_plans(&self) -> Vec<(String, SerializedPlan)> { + pub fn worker_plans(&self) -> Vec<(String, PreSerializedPlan)> { let mut res = Vec::new(); for (node_name, partitions) in self.partitions.iter() { res.push(( @@ -1517,7 +1518,7 @@ impl ClusterSendExec { fn serialized_plan_for_partitions( &self, partitions: &(Vec<(u64, RowRange)>, Vec), - ) -> SerializedPlan { + ) -> PreSerializedPlan { let (partitions, inline_table_ids) = partitions; let mut ps = HashMap::<_, RowFilter>::new(); for (id, range) in partitions { @@ -1583,13 +1584,13 @@ impl ExecutionPlan for ClusterSendExec { let node_name = node_name.to_string(); if self.use_streaming { // A future that yields a stream - let fut = async move { cluster.run_select_stream(&node_name, plan).await }; + let fut = async move { cluster.run_select_stream(&node_name, plan.to_serialized_plan()?).await }; // Use TryStreamExt::try_flatten to flatten the stream of streams let stream = futures::stream::once(fut).try_flatten(); Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } else { - let record_batches = async move { cluster.run_select(&node_name, plan).await }; + let record_batches = async move { cluster.run_select(&node_name, plan.to_serialized_plan()?).await }; let stream = futures::stream::once(record_batches).flat_map(|r| match r { Ok(vec) => stream::iter(vec.into_iter().map(|b| Ok(b)).collect::>()), Err(e) => stream::iter(vec![Err(DataFusionError::Execution(e.to_string()))]), diff --git a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs index 1dccc31fbc074..bca4ed6d089e7 100644 --- a/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs +++ b/rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs @@ -79,6 +79,16 @@ impl RowFilter { } } +/// SerializedPlan, but before we actually serialize the LogicalPlan. +#[derive(Debug)] +pub struct PreSerializedPlan { + logical_plan: LogicalPlan, + schema_snapshot: Arc, + partition_ids_to_execute: Vec<(u64, RowFilter)>, + inline_table_ids_to_execute: Vec, + trace_obj: Option, +} + #[derive(Clone, Serialize, Deserialize, Debug)] pub struct SerializedPlan { logical_plan: Arc>, @@ -1052,21 +1062,31 @@ pub enum SerializedTableSource { InlineTable(InlineTableProvider), } -impl SerializedPlan { - pub async fn try_new( - plan: LogicalPlan, - index_snapshots: PlanningMeta, - trace_obj: Option, - ) -> Result { +impl PreSerializedPlan { + pub fn to_serialized_plan(&self) -> Result { let serialized_logical_plan = datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec( - &plan, + &self.logical_plan, &CubeExtensionCodec { worker_context: None, }, )?; Ok(SerializedPlan { logical_plan: Arc::new(serialized_logical_plan.to_vec()), + schema_snapshot: self.schema_snapshot.clone(), + partition_ids_to_execute: self.partition_ids_to_execute.clone(), + inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(), + trace_obj: self.trace_obj.clone(), + }) + } + + pub fn try_new( + plan: LogicalPlan, + index_snapshots: PlanningMeta, + trace_obj: Option, + ) -> Result { + Ok(PreSerializedPlan { + logical_plan: plan, schema_snapshot: Arc::new(SchemaSnapshot { index_snapshots }), partition_ids_to_execute: Vec::new(), inline_table_ids_to_execute: Vec::new(), @@ -1093,59 +1113,6 @@ impl SerializedPlan { } } - pub fn logical_plan( - &self, - remote_to_local_names: HashMap, - chunk_id_to_record_batches: HashMap>, - parquet_metadata_cache: Arc, - ) -> Result { - // TODO DF upgrade SessionContext::new() - // After this comment was made, we now register_udaf... what else? - let session_context = SessionContext::new(); - // TODO DF upgrade: consistently build SessionContexts/register udafs/udfs. - for udaf in registerable_aggregate_udfs() { - session_context.register_udaf(udaf); - } - for udf in registerable_scalar_udfs() { - session_context.register_udf(udf); - } - - let logical_plan = logical_plan_from_bytes_with_extension_codec( - self.logical_plan.as_slice(), - &session_context, - &CubeExtensionCodec { - worker_context: Some(WorkerContext { - remote_to_local_names, - worker_partition_ids: self.partition_ids_to_execute.clone(), - inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(), - chunk_id_to_record_batches, - parquet_metadata_cache, - }), - }, - )?; - Ok(logical_plan) - } - - pub fn trace_obj(&self) -> Option { - self.trace_obj.clone() - } - - pub fn index_snapshots(&self) -> &Vec { - &self.schema_snapshot.index_snapshots.indices - } - - pub fn planning_meta(&self) -> &PlanningMeta { - &self.schema_snapshot.index_snapshots - } - - pub fn files_to_download(&self) -> Vec<(IdRow, String, Option, Option)> { - self.list_files_to_download(|id| { - self.partition_ids_to_execute - .binary_search_by_key(&id, |(id, _)| *id) - .is_ok() - }) - } - /// Note: avoid during normal execution, workers must filter the partitions they execute. pub fn all_required_files(&self) -> Vec<(IdRow, String, Option, Option)> { self.list_files_to_download(|_| true) @@ -1161,7 +1128,18 @@ impl SerializedPlan { /* chunk_id */ Option, )> { let indexes = self.index_snapshots(); + Self::list_files_to_download_given_index_snapshots(indexes, include_partition) + } + fn list_files_to_download_given_index_snapshots( + indexes: &Vec, + include_partition: impl Fn(u64) -> bool, + ) -> Vec<( + IdRow, + /* file_name */ String, + /* size */ Option, + /* chunk_id */ Option, + )> { let mut files = Vec::new(); for index in indexes.iter() { @@ -1198,6 +1176,115 @@ impl SerializedPlan { files } + pub fn index_snapshots(&self) -> &Vec { + &self.schema_snapshot.index_snapshots.indices + } + + pub fn planning_meta(&self) -> &PlanningMeta { + &self.schema_snapshot.index_snapshots + } + + pub fn logical_plan(&self) -> &LogicalPlan { + &self.logical_plan + } +} + +impl SerializedPlan { + pub async fn try_new( + plan: LogicalPlan, + index_snapshots: PlanningMeta, + trace_obj: Option, + ) -> Result { + let serialized_logical_plan = + datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec( + &plan, + &CubeExtensionCodec { + worker_context: None, + }, + )?; + Ok(SerializedPlan { + logical_plan: Arc::new(serialized_logical_plan.to_vec()), + schema_snapshot: Arc::new(SchemaSnapshot { index_snapshots }), + partition_ids_to_execute: Vec::new(), + inline_table_ids_to_execute: Vec::new(), + trace_obj, + }) + } + + pub fn to_pre_serialized( + &self, + remote_to_local_names: HashMap, + chunk_id_to_record_batches: HashMap>, + parquet_metadata_cache: Arc, + ) -> Result { + let plan = self.logical_plan( + remote_to_local_names, + chunk_id_to_record_batches, + parquet_metadata_cache, + )?; + Ok(PreSerializedPlan { + logical_plan: plan, + schema_snapshot: self.schema_snapshot.clone(), + partition_ids_to_execute: self.partition_ids_to_execute.clone(), + inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(), + trace_obj: self.trace_obj.clone(), + }) + } + + pub fn logical_plan( + &self, + remote_to_local_names: HashMap, + chunk_id_to_record_batches: HashMap>, + parquet_metadata_cache: Arc, + ) -> Result { + // TODO DF upgrade SessionContext::new() + // After this comment was made, we now register_udaf... what else? + let session_context = SessionContext::new(); + // TODO DF upgrade: consistently build SessionContexts/register udafs/udfs. + for udaf in registerable_aggregate_udfs() { + session_context.register_udaf(udaf); + } + for udf in registerable_scalar_udfs() { + session_context.register_udf(udf); + } + + let logical_plan = logical_plan_from_bytes_with_extension_codec( + self.logical_plan.as_slice(), + &session_context, + &CubeExtensionCodec { + worker_context: Some(WorkerContext { + remote_to_local_names, + worker_partition_ids: self.partition_ids_to_execute.clone(), + inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(), + chunk_id_to_record_batches, + parquet_metadata_cache, + }), + }, + )?; + Ok(logical_plan) + } + + pub fn trace_obj(&self) -> Option { + self.trace_obj.clone() + } + + pub fn index_snapshots(&self) -> &Vec { + &self.schema_snapshot.index_snapshots.indices + } + + pub fn planning_meta(&self) -> &PlanningMeta { + &self.schema_snapshot.index_snapshots + } + + pub fn files_to_download(&self) -> Vec<(IdRow, String, Option, Option)> { + let indexes: &Vec = self.index_snapshots(); + PreSerializedPlan::list_files_to_download_given_index_snapshots(indexes, |id| { + self.partition_ids_to_execute + .binary_search_by_key(&id, |(id, _)| *id) + .is_ok() + }) + } + pub fn in_memory_chunks_to_load(&self) -> Vec<(IdRow, IdRow, IdRow)> { self.list_in_memory_chunks_to_load(|id| { self.partition_ids_to_execute diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 2f9b34d228da9..100d1ef346fe9 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -50,7 +50,7 @@ use crate::metastore::{ use crate::queryplanner::panic::PanicWorkerNode; use crate::queryplanner::pretty_printers::{pp_phys_plan, pp_plan}; use crate::queryplanner::query_executor::{batches_to_dataframe, ClusterSendExec, QueryExecutor}; -use crate::queryplanner::serialized_plan::{RowFilter, SerializedPlan}; +use crate::queryplanner::serialized_plan::{PreSerializedPlan, RowFilter, SerializedPlan}; use crate::queryplanner::{PlanningMeta, QueryPlan, QueryPlanner}; use crate::remotefs::RemoteFs; use crate::sql::cache::SqlResultCache; @@ -382,7 +382,7 @@ impl SqlServiceImpl { ) -> Result, CubeError> { fn extract_worker_plans( p: &Arc, - ) -> Option> { + ) -> Option> { if let Some(p) = p.as_any().downcast_ref::() { Some(p.worker_plans()) } else { @@ -407,11 +407,7 @@ impl SqlServiceImpl { let res = match query_plan { QueryPlan::Select(serialized, _) => { let res = if !analyze { - let logical_plan = serialized.logical_plan( - HashMap::new(), - HashMap::new(), - NoopParquetMetadataCache::new(), - )?; + let logical_plan = serialized.logical_plan(); DataFrame::new( vec![Column::new( @@ -431,7 +427,7 @@ impl SqlServiceImpl { ]; let mut rows = Vec::new(); - let router_plan = executor.router_plan(serialized.clone(), cluster).await?.0; + let router_plan = executor.router_plan(serialized.to_serialized_plan()?, cluster).await?.0; rows.push(Row::new(vec![ TableValue::String("router".to_string()), TableValue::String("".to_string()), @@ -443,7 +439,7 @@ impl SqlServiceImpl { .into_iter() .map(|(name, plan)| async move { self.cluster - .run_explain_analyze(&name, plan.clone()) + .run_explain_analyze(&name, plan.to_serialized_plan()?) .await .map(|p| (name, p)) }) @@ -1083,7 +1079,7 @@ impl SqlService for SqlServiceImpl { timeout( self.query_timeout, self.cache - .get(query, context, serialized, async move |plan| { + .get(query, context, serialized.to_serialized_plan()?, async move |plan| { let records; if workers.len() == 0 { records = @@ -1159,7 +1155,7 @@ impl SqlService for SqlServiceImpl { match logical_plan { QueryPlan::Select(router_plan, _) => { // For tests, pretend we have all partitions on the same worker. - let worker_plan = router_plan.with_partition_id_to_execute( + let worker_plan: PreSerializedPlan = router_plan.with_partition_id_to_execute( router_plan .index_snapshots() .iter() @@ -1171,6 +1167,7 @@ impl SqlService for SqlServiceImpl { .collect(), context.inline_tables.into_iter().map(|i| i.id).collect(), ); + let worker_plan: SerializedPlan = worker_plan.to_serialized_plan()?; let mut mocked_names = HashMap::new(); for (_, f, _, _) in worker_plan.files_to_download() { let name = self.remote_fs.local_file(f.clone()).await?; @@ -1184,7 +1181,7 @@ impl SqlService for SqlServiceImpl { return Ok(QueryPlans { router: self .query_executor - .router_plan(router_plan, self.cluster.clone()) + .router_plan(router_plan.to_serialized_plan()?, self.cluster.clone()) .await? .0, worker: self