Skip to content

Commit

Permalink
chore(cubestore): Upgrade DF: Split SerializedPlan type into PreSeria…
Browse files Browse the repository at this point in the history
…lizedPlan
  • Loading branch information
srh committed Dec 17, 2024
1 parent 9cbf9d0 commit 7956b2a
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 96 deletions.
5 changes: 3 additions & 2 deletions rust/cubestore/cubestore/src/queryplanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod serialized_plan;
mod tail_limit;
mod topk;
pub mod trace_data_loaded;
use serialized_plan::PreSerializedPlan;
pub use topk::MIN_TOPK_STREAM_ROWS;
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
mod filter_by_key_range;
Expand Down Expand Up @@ -122,7 +123,7 @@ crate::di_service!(QueryPlannerImpl, [QueryPlanner]);

pub enum QueryPlan {
Meta(LogicalPlan),
Select(SerializedPlan, /*workers*/ Vec<String>),
Select(PreSerializedPlan, /*workers*/ Vec<String>),
}

#[async_trait]
Expand Down Expand Up @@ -191,7 +192,7 @@ impl QueryPlanner for QueryPlannerImpl {
&meta.multi_part_subtree,
)?;
QueryPlan::Select(
SerializedPlan::try_new(logical_plan, meta, trace_obj).await?,
PreSerializedPlan::try_new(logical_plan, meta, trace_obj)?,
workers,
)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ use rewrite_plan::rewrite_physical_plan;
use std::sync::Arc;
use trace_data_loaded::add_trace_data_loaded_exec;

use super::serialized_plan::PreSerializedPlan;

pub struct CubeQueryPlanner {
cluster: Option<Arc<dyn Cluster>>,
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
memory_handler: Arc<dyn MemoryHandler>,
data_loaded_size: Option<Arc<DataLoadedSize>>,
}

impl CubeQueryPlanner {
pub fn new_on_router(
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
memory_handler: Arc<dyn MemoryHandler>,
) -> CubeQueryPlanner {
CubeQueryPlanner {
Expand All @@ -52,7 +54,7 @@ impl CubeQueryPlanner {
}

pub fn new_on_worker(
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
memory_handler: Arc<dyn MemoryHandler>,
data_loaded_size: Option<Arc<DataLoadedSize>>,
) -> CubeQueryPlanner {
Expand Down
4 changes: 3 additions & 1 deletion rust/cubestore/cubestore/src/queryplanner/planning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::iter::FromIterator;

use super::serialized_plan::PreSerializedPlan;

#[cfg(test)]
pub async fn choose_index(
p: LogicalPlan,
Expand Down Expand Up @@ -1585,7 +1587,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result<LogicalPlan, DataFusionErr

pub struct CubeExtensionPlanner {
pub cluster: Option<Arc<dyn Cluster>>,
pub serialized_plan: Arc<SerializedPlan>,
pub serialized_plan: Arc<PreSerializedPlan>,
}

#[async_trait]
Expand Down
37 changes: 19 additions & 18 deletions rust/cubestore/cubestore/src/queryplanner/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use std::sync::Arc;
use std::time::SystemTime;
use tracing::{instrument, Instrument};

use super::serialized_plan::PreSerializedPlan;
use super::udfs::{
aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs,
registerable_arc_scalar_udfs, CubeAggregateUDFKind,
Expand Down Expand Up @@ -287,19 +288,19 @@ impl QueryExecutor for QueryExecutorImpl {
plan: SerializedPlan,
cluster: Arc<dyn Cluster>,
) -> Result<(Arc<dyn ExecutionPlan>, LogicalPlan), CubeError> {
let plan_to_move = plan.logical_plan(
let pre_serialized_plan = plan.to_pre_serialized(
HashMap::new(),
HashMap::new(),
NoopParquetMetadataCache::new(),
)?;
let serialized_plan = Arc::new(plan);
let ctx = self.router_context(cluster.clone(), serialized_plan.clone())?;
let pre_serialized_plan = Arc::new(pre_serialized_plan);
let ctx = self.router_context(cluster.clone(), pre_serialized_plan.clone())?;
Ok((
ctx.clone()
.state()
.create_physical_plan(&plan_to_move.clone())
.create_physical_plan(pre_serialized_plan.logical_plan())
.await?,
plan_to_move,
pre_serialized_plan.logical_plan().clone(),
))
}

Expand All @@ -310,20 +311,20 @@ impl QueryExecutor for QueryExecutorImpl {
chunk_id_to_record_batches: HashMap<u64, Vec<RecordBatch>>,
data_loaded_size: Option<Arc<DataLoadedSize>>,
) -> Result<(Arc<dyn ExecutionPlan>, LogicalPlan), CubeError> {
let plan_to_move = plan.logical_plan(
let pre_serialized_plan = plan.to_pre_serialized(
remote_to_local_names,
chunk_id_to_record_batches,
self.parquet_metadata_cache.cache().clone(),
)?;
let plan = Arc::new(plan);
let ctx = self.worker_context(plan.clone(), data_loaded_size)?;
let pre_serialized_plan = Arc::new(pre_serialized_plan);
let ctx = self.worker_context(pre_serialized_plan.clone(), data_loaded_size)?;
let plan_ctx = ctx.clone();
Ok((
plan_ctx
.state()
.create_physical_plan(&plan_to_move.clone())
.create_physical_plan(pre_serialized_plan.logical_plan())
.await?,
plan_to_move,
pre_serialized_plan.logical_plan().clone(),
))
}

Expand Down Expand Up @@ -372,7 +373,7 @@ impl QueryExecutorImpl {
fn router_context(
&self,
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
) -> Result<Arc<SessionContext>, CubeError> {
let runtime = Arc::new(RuntimeEnv::default());
let config = Self::session_config();
Expand Down Expand Up @@ -424,7 +425,7 @@ impl QueryExecutorImpl {

fn worker_context(
&self,
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
data_loaded_size: Option<Arc<DataLoadedSize>>,
) -> Result<Arc<SessionContext>, CubeError> {
let runtime = Arc::new(RuntimeEnv::default());
Expand Down Expand Up @@ -1229,7 +1230,7 @@ pub struct ClusterSendExec {
/// Never executed, only stored to allow consistent optimization on router and worker.
pub input_for_optimizations: Arc<dyn ExecutionPlan>,
pub cluster: Arc<dyn Cluster>,
pub serialized_plan: Arc<SerializedPlan>,
pub serialized_plan: Arc<PreSerializedPlan>,
pub use_streaming: bool,
}

Expand All @@ -1248,7 +1249,7 @@ pub enum InlineCompoundPartition {
impl ClusterSendExec {
pub fn new(
cluster: Arc<dyn Cluster>,
serialized_plan: Arc<SerializedPlan>,
serialized_plan: Arc<PreSerializedPlan>,
union_snapshots: &[Snapshots],
input_for_optimizations: Arc<dyn ExecutionPlan>,
use_streaming: bool,
Expand Down Expand Up @@ -1503,7 +1504,7 @@ impl ClusterSendExec {
}
}

pub fn worker_plans(&self) -> Vec<(String, SerializedPlan)> {
pub fn worker_plans(&self) -> Vec<(String, PreSerializedPlan)> {
let mut res = Vec::new();
for (node_name, partitions) in self.partitions.iter() {
res.push((
Expand All @@ -1517,7 +1518,7 @@ impl ClusterSendExec {
fn serialized_plan_for_partitions(
&self,
partitions: &(Vec<(u64, RowRange)>, Vec<InlineTableId>),
) -> SerializedPlan {
) -> PreSerializedPlan {
let (partitions, inline_table_ids) = partitions;
let mut ps = HashMap::<_, RowFilter>::new();
for (id, range) in partitions {
Expand Down Expand Up @@ -1583,13 +1584,13 @@ impl ExecutionPlan for ClusterSendExec {
let node_name = node_name.to_string();
if self.use_streaming {
// A future that yields a stream
let fut = async move { cluster.run_select_stream(&node_name, plan).await };
let fut = async move { cluster.run_select_stream(&node_name, plan.to_serialized_plan()?).await };
// Use TryStreamExt::try_flatten to flatten the stream of streams
let stream = futures::stream::once(fut).try_flatten();

Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
} else {
let record_batches = async move { cluster.run_select(&node_name, plan).await };
let record_batches = async move { cluster.run_select(&node_name, plan.to_serialized_plan()?).await };
let stream = futures::stream::once(record_batches).flat_map(|r| match r {
Ok(vec) => stream::iter(vec.into_iter().map(|b| Ok(b)).collect::<Vec<_>>()),
Err(e) => stream::iter(vec![Err(DataFusionError::Execution(e.to_string()))]),
Expand Down
Loading

0 comments on commit 7956b2a

Please sign in to comment.