From d5b0239d22cc4416635a099830cd82da1491b7b8 Mon Sep 17 00:00:00 2001 From: Yuchen Liang Date: Mon, 11 Nov 2024 22:37:50 -0500 Subject: [PATCH] allow config changes at runtime in SQL Signed-off-by: Yuchen Liang --- datafusion-optd-cli/src/main.rs | 10 +- optd-datafusion-bridge/src/from_optd.rs | 4 +- optd-datafusion-bridge/src/lib.rs | 133 ++++++++++++++++++++++-- 3 files changed, 135 insertions(+), 12 deletions(-) diff --git a/datafusion-optd-cli/src/main.rs b/datafusion-optd-cli/src/main.rs index d59d4147..38ec68a4 100644 --- a/datafusion-optd-cli/src/main.rs +++ b/datafusion-optd-cli/src/main.rs @@ -180,6 +180,13 @@ pub async fn main() -> Result<()> { session_config = session_config.with_batch_size(batch_size); }; + // Set optd configs from command line. + let mut config_ext = datafusion::config::Extensions::new(); + let mut optd_conf = optd_datafusion_bridge::OptdDFConfig::default(); + optd_conf.enable_adaptive = args.enable_adaptive; + config_ext.insert(optd_conf); + session_config.options_mut().extensions = config_ext; + let rn_config = RuntimeConfig::new(); let rn_config = // set memory pool size @@ -204,8 +211,7 @@ pub async fn main() -> Result<()> { let runtime_env = create_runtime_env(rn_config.clone())?; let mut ctx = { - let mut state = - SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env)); + let mut state = SessionState::new_with_config_rt(session_config, Arc::new(runtime_env)); if !args.enable_df_logical { // clean up optimizer rules so that we can plug in our own optimizer state = state.with_optimizer_rules(vec![]); diff --git a/optd-datafusion-bridge/src/from_optd.rs b/optd-datafusion-bridge/src/from_optd.rs index 5346a021..5fd5ef86 100644 --- a/optd-datafusion-bridge/src/from_optd.rs +++ b/optd-datafusion-bridge/src/from_optd.rs @@ -594,8 +594,8 @@ impl OptdPlanContext<'_> { typ => unimplemented!("{}", typ), }; - let optimizer = self.optimizer.as_ref().unwrap(); - if optimizer.adaptive_enabled() { + let config = self.optd_config(); + if config.enable_adaptive { let bare_with_collector: Result> = Ok(Arc::new(CollectorExec::new( bare, diff --git a/optd-datafusion-bridge/src/lib.rs b/optd-datafusion-bridge/src/lib.rs index d99d9d08..35cec9cb 100644 --- a/optd-datafusion-bridge/src/lib.rs +++ b/optd-datafusion-bridge/src/lib.rs @@ -14,6 +14,8 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use datafusion::arrow::datatypes::DataType; +use datafusion::catalog::information_schema::InformationSchemaProvider; +use datafusion::catalog::schema::SchemaProvider; use datafusion::catalog::CatalogList; use datafusion::error::Result; use datafusion::execution::context::{QueryPlanner, SessionState}; @@ -23,13 +25,107 @@ use datafusion::logical_expr::{ use datafusion::physical_plan::explain::ExplainExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner}; +use datafusion::sql::TableReference; use itertools::Itertools; use optd_datafusion_repr::plan_nodes::{ - dispatch_plan_explain_to_string, ArcDfPlanNode, ConstantType, DfNodeType, DfReprPlanNode, - PhysicalHashJoin, PhysicalNestedLoopJoin, + dispatch_plan_explain_to_string, ArcDfPlanNode, DfNodeType, DfReprPlanNode, PhysicalHashJoin, + PhysicalNestedLoopJoin, }; -use optd_datafusion_repr::properties::schema::Catalog; -use optd_datafusion_repr::{DatafusionOptimizer, MemoExt}; +use optd_datafusion_repr::{ + plan_nodes::ConstantType, properties::schema::Catalog, DatafusionOptimizer, MemoExt, +}; + +macro_rules! optd_extensions_options { + ( + $(#[doc = $struct_d:tt])* + $vis:vis struct $struct_name:ident { + $( + $(#[doc = $d:tt])* + $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr + )*$(,)* + } + ) => { + $(#[doc = $struct_d])* + #[derive(Debug, Clone)] + #[non_exhaustive] + $vis struct $struct_name{ + $( + $(#[doc = $d])* + $field_vis $field_name : $field_type, + )* + } + + impl Default for $struct_name { + fn default() -> Self { + Self { + $($field_name: $default),* + } + } + } + + impl datafusion::config::ExtensionOptions for $struct_name { + fn as_any(&self) -> &dyn ::std::any::Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> Result<()> { + match key { + $( + stringify!($field_name) => { + self.$field_name = value.parse().map_err(|e| { + ::datafusion::error::DataFusionError::Context( + format!(concat!("Error parsing {} as ", stringify!($t),), value), + Box::new(::datafusion::error::DataFusionError::External(Box::new(e))), + ) + })?; + Ok(()) + } + )* + _ => Err(::datafusion::error::DataFusionError::Internal( + format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key) + )) + } + } + + fn entries(&self) -> Vec { + vec![ + $( + datafusion::config::ConfigEntry { + key: format!("optd.{}", stringify!($field_name)), + value: (self.$field_name != $default).then(|| self.$field_name.to_string()), + description: concat!($($d),*).trim(), + }, + )* + ] + } + } + } +} + +optd_extensions_options! { + /// optd configurations + pub struct OptdDFConfig { + /// Turn on adaptive optimization. + pub enable_adaptive: bool, default = false + /// Use heuristic optimizer before entering cascades. + pub enable_heuristic: bool, default = true + + pub explain_logical: bool, default = true + } + +} + +impl datafusion::config::ConfigExtension for OptdDFConfig { + const PREFIX: &'static str = "optd"; +} pub struct OptdPlanContext<'a> { tables: HashMap>, @@ -45,23 +141,44 @@ impl<'a> OptdPlanContext<'a> { optimizer: None, } } + + pub fn optd_config(&self) -> &OptdDFConfig { + let config = self + .session_state + .config_options() + .extensions + .get::() + .expect("optd config not set"); + + config + } } pub struct DatafusionCatalog { catalog: Arc, + information_schema: Arc, } impl DatafusionCatalog { pub fn new(catalog: Arc) -> Self { - Self { catalog } + let information_schema = Arc::new(InformationSchemaProvider::new(catalog.clone())); + Self { + catalog, + information_schema, + } } } impl Catalog for DatafusionCatalog { fn get(&self, name: &str) -> optd_datafusion_repr::properties::schema::Schema { - let catalog = self.catalog.catalog("datafusion").unwrap(); - let schema = catalog.schema("public").unwrap(); - let table = futures_lite::future::block_on(schema.table(name.as_ref())).unwrap(); + let resolved = TableReference::from(name).resolve("datafusion", "public"); + let catalog = self.catalog.catalog(&resolved.catalog).unwrap(); + let schema = if resolved.schema == "information_schema" { + self.information_schema.clone() + } else { + catalog.schema(&resolved.schema).unwrap() + }; + let table = futures_lite::future::block_on(schema.table(&resolved.table)).unwrap(); let schema = table.schema(); let fields = schema.fields(); let mut optd_fields = Vec::with_capacity(fields.len());