Skip to content
This repository was archived by the owner on Jan 7, 2025. It is now read-only.

allow config changes at runtime in SQL #233

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions datafusion-optd-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ pub async fn main() -> Result<()> {
session_config = session_config.with_batch_size(batch_size);
};

// Set optd configs from command line.
let mut config_ext = datafusion::config::Extensions::new();
let mut optd_conf = optd_datafusion_bridge::OptdDFConfig::default();
optd_conf.enable_adaptive = args.enable_adaptive;
config_ext.insert(optd_conf);
session_config.options_mut().extensions = config_ext;

let rn_config = RuntimeConfig::new();
let rn_config =
// set memory pool size
Expand All @@ -204,8 +211,7 @@ pub async fn main() -> Result<()> {
let runtime_env = create_runtime_env(rn_config.clone())?;

let mut ctx = {
let mut state =
SessionState::new_with_config_rt(session_config.clone(), Arc::new(runtime_env));
let mut state = SessionState::new_with_config_rt(session_config, Arc::new(runtime_env));
if !args.enable_df_logical {
// clean up optimizer rules so that we can plug in our own optimizer
state = state.with_optimizer_rules(vec![]);
Expand Down
4 changes: 2 additions & 2 deletions optd-datafusion-bridge/src/from_optd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,8 +594,8 @@ impl OptdPlanContext<'_> {
typ => unimplemented!("{}", typ),
};

let optimizer = self.optimizer.as_ref().unwrap();
if optimizer.adaptive_enabled() {
let config = self.optd_config();
if config.enable_adaptive {
let bare_with_collector: Result<Arc<dyn ExecutionPlan>> =
Ok(Arc::new(CollectorExec::new(
bare,
Expand Down
133 changes: 125 additions & 8 deletions optd-datafusion-bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion::arrow::datatypes::DataType;
use datafusion::catalog::information_schema::InformationSchemaProvider;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::CatalogList;
use datafusion::error::Result;
use datafusion::execution::context::{QueryPlanner, SessionState};
Expand All @@ -23,13 +25,107 @@ use datafusion::logical_expr::{
use datafusion::physical_plan::explain::ExplainExec;
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion::sql::TableReference;
use itertools::Itertools;
use optd_datafusion_repr::plan_nodes::{
dispatch_plan_explain_to_string, ArcDfPlanNode, ConstantType, DfNodeType, DfReprPlanNode,
PhysicalHashJoin, PhysicalNestedLoopJoin,
dispatch_plan_explain_to_string, ArcDfPlanNode, DfNodeType, DfReprPlanNode, PhysicalHashJoin,
PhysicalNestedLoopJoin,
};
use optd_datafusion_repr::properties::schema::Catalog;
use optd_datafusion_repr::{DatafusionOptimizer, MemoExt};
use optd_datafusion_repr::{
plan_nodes::ConstantType, properties::schema::Catalog, DatafusionOptimizer, MemoExt,
};

macro_rules! optd_extensions_options {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}

impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}

impl datafusion::config::ExtensionOptions for $struct_name {
fn as_any(&self) -> &dyn ::std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
self
}

fn cloned(&self) -> Box<dyn datafusion::config::ExtensionOptions> {
Box::new(self.clone())
}

fn set(&mut self, key: &str, value: &str) -> Result<()> {
match key {
$(
stringify!($field_name) => {
self.$field_name = value.parse().map_err(|e| {
::datafusion::error::DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new(::datafusion::error::DataFusionError::External(Box::new(e))),
)
})?;
Ok(())
}
)*
_ => Err(::datafusion::error::DataFusionError::Internal(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
}
}

fn entries(&self) -> Vec<datafusion::config::ConfigEntry> {
vec![
$(
datafusion::config::ConfigEntry {
key: format!("optd.{}", stringify!($field_name)),
value: (self.$field_name != $default).then(|| self.$field_name.to_string()),
description: concat!($($d),*).trim(),
},
)*
]
}
}
}
}

optd_extensions_options! {
/// optd configurations
pub struct OptdDFConfig {
/// Turn on adaptive optimization.
pub enable_adaptive: bool, default = false
/// Use heuristic optimizer before entering cascades.
pub enable_heuristic: bool, default = true

pub explain_logical: bool, default = true
}

}

impl datafusion::config::ConfigExtension for OptdDFConfig {
const PREFIX: &'static str = "optd";
}

pub struct OptdPlanContext<'a> {
tables: HashMap<String, Arc<dyn TableSource>>,
Expand All @@ -45,23 +141,44 @@ impl<'a> OptdPlanContext<'a> {
optimizer: None,
}
}

pub fn optd_config(&self) -> &OptdDFConfig {
let config = self
.session_state
.config_options()
.extensions
.get::<OptdDFConfig>()
.expect("optd config not set");

config
}
}

pub struct DatafusionCatalog {
catalog: Arc<dyn CatalogList>,
information_schema: Arc<dyn SchemaProvider>,
}

impl DatafusionCatalog {
pub fn new(catalog: Arc<dyn CatalogList>) -> Self {
Self { catalog }
let information_schema = Arc::new(InformationSchemaProvider::new(catalog.clone()));
Self {
catalog,
information_schema,
}
}
}

impl Catalog for DatafusionCatalog {
fn get(&self, name: &str) -> optd_datafusion_repr::properties::schema::Schema {
let catalog = self.catalog.catalog("datafusion").unwrap();
let schema = catalog.schema("public").unwrap();
let table = futures_lite::future::block_on(schema.table(name.as_ref())).unwrap();
let resolved = TableReference::from(name).resolve("datafusion", "public");
let catalog = self.catalog.catalog(&resolved.catalog).unwrap();
let schema = if resolved.schema == "information_schema" {
self.information_schema.clone()
} else {
catalog.schema(&resolved.schema).unwrap()
};
let table = futures_lite::future::block_on(schema.table(&resolved.table)).unwrap();
let schema = table.schema();
let fields = schema.fields();
let mut optd_fields = Vec::with_capacity(fields.len());
Expand Down
Loading