Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions crates/polars-plan/src/plans/conversion/dsl_to_ir.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::path::PathBuf;

use arrow::datatypes::ArrowSchemaRef;
use either::Either;
use expr_expansion::{is_regex_projection, rewrite_projections};
use hive::hive_partitions_from_paths;
use polars_io::HiveOptions;

use super::stack_opt::ConversionOptimizer;
use super::*;
Expand Down Expand Up @@ -76,6 +79,7 @@ pub fn to_alp(
lp_arena,
conversion_optimizer,
opt_flags,
expansion_cache: Default::default(),
};

match to_alp_impl(lp, &mut ctxt) {
Expand Down Expand Up @@ -103,11 +107,29 @@ pub fn to_alp(
}
}

type Paths = Arc<[PathBuf]>;

pub(super) struct DslConversionContext<'a> {
pub(super) expr_arena: &'a mut Arena<AExpr>,
pub(super) lp_arena: &'a mut Arena<IR>,
pub(super) conversion_optimizer: ConversionOptimizer,
pub(super) opt_flags: &'a mut OptFlags,
// Cache expanded paths to prevent duplicate expensive
expansion_cache: PlHashMap<Paths, (Paths, HiveOptions)>,
}

impl DslConversionContext<'_> {
fn get_paths(&mut self, sources: &ScanSources) -> Option<(Paths, HiveOptions)> {
if let ScanSources::Paths(paths) = sources {
if paths.len() == 1 {
self.expansion_cache.get(paths).cloned()
} else {
None
}
} else {
None
}
}
}

pub(super) fn run_conversion(
Expand Down Expand Up @@ -161,6 +183,49 @@ pub fn to_alp_impl(lp: DslPlan, ctxt: &mut DslConversionContext) -> PolarsResult
}
}

let sources = if let Some((paths, hive_opts)) = ctxt.get_paths(&sources) {
file_options.hive_options = hive_opts;
ScanSources::Paths(paths)
} else {
let expanded_sources = match &*scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet { cloud_options, .. } => sources
.expand_paths_with_hive_update(
&mut file_options,
cloud_options.as_ref(),
)?,
#[cfg(feature = "ipc")]
FileScan::Ipc { cloud_options, .. } => sources
.expand_paths_with_hive_update(
&mut file_options,
cloud_options.as_ref(),
)?,
#[cfg(feature = "csv")]
FileScan::Csv { cloud_options, .. } => {
sources.expand_paths(&file_options, cloud_options.as_ref())?
},
#[cfg(feature = "json")]
FileScan::NDJson { cloud_options, .. } => {
sources.expand_paths(&file_options, cloud_options.as_ref())?
},
FileScan::Anonymous { .. } => sources.clone(),
};

// If the paths are expanded. Cache them. We do this because it is very
// expensive to round trip to s3 in case of plans with Common Subplans.
if let (ScanSources::Paths(paths), ScanSources::Paths(expanded_paths)) =
(&sources, &expanded_sources)
{
if paths.len() == 1 && expanded_paths.len() > 1 {
ctxt.expansion_cache.insert(
paths.clone(),
(expanded_paths.clone(), file_options.hive_options.clone()),
);
}
}
expanded_sources
};

let sources = match &*scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet { cloud_options, .. } => sources
Expand Down
Loading