Skip to content

Commit e4f2b49

Browse files
rgehanNGA-TRAN
andauthored
Reproducer tests for #18380 (resorting sorted inputs) (#18352)
## Which issue does this PR close? None, but relates to issue #9898 ## Rationale for this change N/A ## What changes are included in this PR? This PR adds reproducer tests demonstrating issues with suboptimal optimizations performed on plans that mix pre-sorted parquets and `SortExec` under an UNION. Two sets of tests included: - Unit tests in `datafusion/core/tests/physical_optimizer/enforce_sorting.rs` - E2E-ish tests in `datafusion/core/tests/dataframe/mod.rs`, starting from logical plans simulating our use-case > [!NOTE] > These tests pass with the changes from #9867 ## Are these changes tested? N/A ## Are there any user-facing changes? N/A --------- Co-authored-by: Nga Tran <[email protected]>
1 parent 076b091 commit e4f2b49

File tree

2 files changed

+239
-0
lines changed

2 files changed

+239
-0
lines changed

datafusion/core/tests/dataframe/mod.rs

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use insta::assert_snapshot;
4545
use object_store::local::LocalFileSystem;
4646
use std::collections::HashMap;
4747
use std::fs;
48+
use std::path::Path;
4849
use std::sync::Arc;
4950
use tempfile::TempDir;
5051
use url::Url;
@@ -3110,6 +3111,156 @@ async fn test_count_wildcard_on_window() -> Result<()> {
31103111
Ok(())
31113112
}
31123113

3114+
#[tokio::test]
3115+
// Test with `repartition_sorts` disabled, causing a full resort of the data
3116+
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_false(
3117+
) -> Result<()> {
3118+
assert_snapshot!(
3119+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(false).await?,
3120+
@r#"
3121+
AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3122+
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
3123+
CoalescePartitionsExec
3124+
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
3125+
UnionExec
3126+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3127+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3128+
"#);
3129+
Ok(())
3130+
}
3131+
3132+
#[ignore] // See https://github.com/apache/datafusion/issues/18380
3133+
#[tokio::test]
3134+
// Test with `repartition_sorts` enabled to preserve pre-sorted partitions and avoid resorting
3135+
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_with_repartition_sorts_true(
3136+
) -> Result<()> {
3137+
assert_snapshot!(
3138+
union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(true).await?,
3139+
@r#"
3140+
AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3141+
SortPreservingMergeExec: [id@0 ASC NULLS LAST]
3142+
AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3143+
UnionExec
3144+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3145+
SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
3146+
DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3147+
"#);
3148+
3149+
// 💥 Doesn't pass, and generates this plan:
3150+
//
3151+
// AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted
3152+
// SortPreservingMergeExec: [id@0 ASC NULLS LAST]
3153+
// SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]
3154+
// AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]
3155+
// UnionExec
3156+
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet
3157+
// DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet
3158+
//
3159+
//
3160+
// === Excerpt from the verbose explain ===
3161+
//
3162+
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3163+
// | plan_type | plan |
3164+
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3165+
// | initial_physical_plan | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3166+
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3167+
// | | UnionExec |
3168+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3169+
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3170+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3171+
// ...
3172+
// | physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified |
3173+
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3174+
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3175+
// | | CoalescePartitionsExec |
3176+
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3177+
// | | UnionExec |
3178+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3179+
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false] |
3180+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3181+
// | | |
3182+
// | physical_plan after CombinePartialFinalAggregate | SAME TEXT AS ABOVE
3183+
// | | |
3184+
// | physical_plan after EnforceSorting | OutputRequirementExec: order_by=[], dist_by=Unspecified |
3185+
// | | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted |
3186+
// | | SortPreservingMergeExec: [id@0 ASC NULLS LAST] |
3187+
// | | SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true] |
3188+
// | | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[] |
3189+
// | | UnionExec |
3190+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
3191+
// | | DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet |
3192+
// ...
3193+
// +------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
3194+
3195+
Ok(())
3196+
}
3197+
3198+
async fn union_with_mix_of_presorted_and_explicitly_resorted_inputs_impl(
3199+
repartition_sorts: bool,
3200+
) -> Result<String> {
3201+
let config = SessionConfig::default()
3202+
.with_target_partitions(1)
3203+
.with_repartition_sorts(repartition_sorts);
3204+
let ctx = SessionContext::new_with_config(config);
3205+
3206+
let testdata = parquet_test_data();
3207+
3208+
// Register "sorted" table, that is sorted
3209+
ctx.register_parquet(
3210+
"sorted",
3211+
&format!("{testdata}/alltypes_tiny_pages.parquet"),
3212+
ParquetReadOptions::default()
3213+
.file_sort_order(vec![vec![col("id").sort(true, false)]]),
3214+
)
3215+
.await?;
3216+
3217+
// Register "unsorted" table
3218+
ctx.register_parquet(
3219+
"unsorted",
3220+
&format!("{testdata}/alltypes_tiny_pages.parquet"),
3221+
ParquetReadOptions::default(),
3222+
)
3223+
.await?;
3224+
3225+
let source_sorted = ctx
3226+
.table("sorted")
3227+
.await
3228+
.unwrap()
3229+
.select(vec![col("id")])
3230+
.unwrap();
3231+
3232+
let source_unsorted = ctx
3233+
.table("unsorted")
3234+
.await
3235+
.unwrap()
3236+
.select(vec![col("id")])
3237+
.unwrap();
3238+
3239+
let source_unsorted_resorted =
3240+
source_unsorted.sort(vec![col("id").sort(true, false)])?;
3241+
3242+
let union = source_sorted.union(source_unsorted_resorted)?;
3243+
3244+
let agg = union.aggregate(vec![col("id")], vec![])?;
3245+
3246+
let df = agg;
3247+
3248+
// To be able to remove user specific paths from the plan, for stable assertions
3249+
let testdata_clean = Path::new(&testdata).canonicalize()?.display().to_string();
3250+
let testdata_clean = testdata_clean.strip_prefix("/").unwrap_or(&testdata_clean);
3251+
3252+
// Use displayable() rather than explain().collect() to avoid table formatting issues. We need
3253+
// to replace machine-specific paths with variable lengths, which breaks table alignment and
3254+
// causes snapshot mismatches.
3255+
let physical_plan = df.create_physical_plan().await?;
3256+
let displayable_plan = displayable(physical_plan.as_ref())
3257+
.indent(true)
3258+
.to_string()
3259+
.replace(testdata_clean, "{testdata}");
3260+
3261+
Ok(displayable_plan)
3262+
}
3263+
31133264
#[tokio::test]
31143265
async fn test_count_wildcard_on_aggregate() -> Result<()> {
31153266
let ctx = create_join_context()?;

0 commit comments

Comments
 (0)