Skip to content

Preserving sort on UnionExec inputs instead of introducing a suboptimal top-level sort #18380

@rgehan

Description

@rgehan

Is your feature request related to a problem or challenge?

When you have a UNION over mostly sorted inputs and explicitly add sorts to the unsorted ones, the enforce_sorting optimizer removes those targeted sorts and moves the sort to the top level instead.

Here's an excerpt of the verbose explain, as you can generate from the failing test in the reproducer PR (full explain: gist):

+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type                               | plan                                                                                                                                                                     |
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                                                                                                                                                          |
| logical_plan                            | Aggregate: groupBy=[[id]], aggr=[[]]                                                                                                                                     |
|                                         |   Union                                                                                                                                                                  |
|                                         |     TableScan: sorted projection=[id]                                                                                                                                    |
|                                         |     Sort: unsorted.id ASC NULLS LAST                                                                                                                                     |
|                                         |       TableScan: unsorted projection=[id]                                                                                                                                |
...
| initial_physical_plan                   | AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted                                                                                               |
|                                         |   AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted                                                                                           |
|                                         |     UnionExec                                                                                                                                                            |
|                                         |       DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet       |
|                                         |       SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]                                                                                                |
|                                         |         DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet                                            |
...
| physical_plan after EnforceDistribution | OutputRequirementExec: order_by=[], dist_by=Unspecified                                                                                                                  |
|                                         |   AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted                                                                                             |
|                                         |     SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]                                                                                                  |
|                                         |       CoalescePartitionsExec                                                                                                                                             |
|                                         |         AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[], ordering_mode=Sorted                                                                                     |
|                                         |           UnionExec                                                                                                                                                      |
|                                         |             DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
|                                         |             SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]                                                                                          |
|                                         |               DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet                                      |
|                                         |                                                                                                                                                                          |
| physical_plan after EnforceSorting      | OutputRequirementExec: order_by=[], dist_by=Unspecified                                                                                                                  |
|                                         |   AggregateExec: mode=Final, gby=[id@0 as id], aggr=[], ordering_mode=Sorted                                                                                             |
|                                         |     SortPreservingMergeExec: [id@0 ASC NULLS LAST]                                                                                                                       |
|                                         |       SortExec: expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true]                                                                                                 |
|                                         |         AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[]                                                                                                           |
|                                         |           UnionExec                                                                                                                                                      |
|                                         |             DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet |
|                                         |             DataSourceExec: file_groups={1 group: [[{testdata}/alltypes_tiny_pages.parquet]]}, projection=[id], file_type=parquet                                        |
...
+-----------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

This re-sorts all data instead of just the unsorted partition, which prevents usage of streaming operators (e.g. SortPreservingMergeExec), increases memory usage / spilling significantly.

This turns what should be a small parallel sort into a memory-intensive / spilling sort of the entire dataset.

Describe the solution you'd like

Sorts below a UnionExec should be preferred over a top-level sort.

In #9867, @NGA-TRAN proposed explicitly implementing required_input_ordering in UnionExec, which seems to fix the reproducer tests I added in #18352. It however breaks other unit tests.

Describe alternatives you've considered

  • Pre-sorting all data, before feeding it to datafusion
  • Implementing a custom sort operator that wouldn't get optimized out

While these are viable workarounds, they are not ideal, and I believe datafusion should be able to handle this case.

Additional context

Reproducer tests in PR #18352.

Related to issue #9898 and its corresponding PR #9867.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions