-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat: add multi level merge sort that will always fit in memory #15700
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
||
|
||
#[tokio::test] | ||
async fn test_low_cardinality() -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fails on main on OOM
|
||
for batch in batches_to_spill { | ||
in_progress_file.append_batch(&batch)?; | ||
|
||
*max_record_batch_size = | ||
(*max_record_batch_size).max(batch.get_actually_used_size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not realistic to correctly know a batch's size after a roundtrip of spilling and reading back, with this get_actually_used_size()
implementation. The actual implementation might give us some surprise. The implementation can get even more complex in the future, for example we might implement extra encodings for #14078, and the memory size of a batch after reading back can be harder to estimate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the actual array content before spill and after spill is different this function will always return the correct result regardless of the spill file format as we calculate the actual array content size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There might be some type of arrays with complex internal buffer management, a simple example is:
Before spilling an StringView
array has 10MB actual content, backed by 3 * 4MB buffer.
After spilling and reading back, the reader implementation decided to use 1 * 16MB buffer instead.
Different allocation policies caused different fragmentation status, and physical memory consumed varies.
Here are some real bugs found recently due to similar reasons (this explains why I'm worried about inconsistent memory size for logically equivalent batches):
#14644
#14823
#13377
Note they're only caused by primitive and string arrays, for more complex types like struct, array, or other nested types, I think it's even more likely to see such inconsistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to reproduce that so I can better answer, how do you create that string view array so it will cause what you said?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So after looking at the code I came to the conclusion that this is still the closest there is to accurately estimating memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not estimate, even if it's correct 99% of the time, IMO it's impossible to make sure it's always accurate for nested type's reader implementation. If the estimate is way off for edge cases, the bug would be hard to investigate.
If we want to follow this optimistic approach, the only required memory accounting I think is during buffering batches inside SortExec
, and all the remaining memory-tracking code can be deleted to make the implementation much more simpler, the potential problem is unexpected behavior for non-primitive types (e.g. dictionary array's row format size can explode)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I added tests for every type to make sure the memory accounting is correct would you approve?
@2010YOUY01 and @ding-young I wonder if you can review this PR again to help @rluvaton get it merged? Specifically if it needs more tests perhaps you can help identify which are needed |
@alamb Sure! I may not be able to provide a detailed review right away, but I can definitely help by running the tests added in the PR locally and looking into memory accounting for the nested type that have been mentioned. |
I have some concerns about this PR's design direction (see more in #15700 (comment)), and I don't think it can be addressed by more extensive tests. |
Why is that? You raised some concerns about miscalculating the size of the record batch, adding tests will make sure we are calculating correctly |
In the interest of this valuable work not being lost, is there any way that #15700 (comment) could be addressed by a method that's not more tests? Could we calculate the actual batch sizes every time we load into memory? Even if possible that opens up questions of what to do if we load a batch and now exceed our memory budget, but maybe it's a path forward? |
Hi @adriangb, thanks for raising this point. I'm currently reviewing both this PR and the other cascading merge sort PR (#15610). I'm not taking sides between the two approaches, but I agree that accurately estimating memory consumption is tricky considering issues discussed above and the fact that now compression is supported in spill files. We may need to think more about whether we can special-case scenarios where the memory size changes after spilling and reloading, or perhaps add some kind of backup logic to handle such situations more gracefully. |
I've rebased this branch on the latest main and tested whether estimated size changes after we load
|
I have a idea to fix this concern: adding a max merge degree configuration, if either This approach I think has two advantages:
I (or possibly @ding-young) can handle this patch in a follow-up PR. I think we can move forward with this one—I’ll review it in the next few days. |
So should I fix this PR conflicts? It seems like this pr has a chance to be merged |
@rluvaton If you’d like, I can send a PR to your (fork's) branch that resolve merge conflicts since I already have one. Anyway there were only minor diffs to handle when I rebased your branch with main. |
I would appreciate it, it would greatly help me |
@rluvaton I opened a pr on your fork. Would you take a look when you have some time? |
@2010YOUY01 I've updated based on your comments and commented back on some |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates, I think the overall structure of this PR looks great, most of my questions/suggestions from the 1st round of review has been addressed.
Now I only got two additional questions:
- #15700 (comment) If this issue is not significant I still think it's better to first integrate this kernel into Arrow, and keep the solution here simple. Otherwise we should add UTs for it before merge.
- Do we need this explicit batch chunking when writing spill files? #15700 (comment)
I left some additional advices for polishing in the 2nd round, but they're totally fine to be done as follow-up PRs.
I think this PR is almost ready, cc @ding-young and @alamb if you also want to review it again.
use futures::TryStreamExt; | ||
use futures::{Stream, StreamExt}; | ||
|
||
/// Merges a stream of sorted cursors and record batches into a single sorted stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to add a high-level doc about how this multi-level merge work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added with diagram
// This is for avoiding double reservation of memory from our side and the sort preserving merge stream | ||
// side. | ||
// and doing a lot of code changes to avoid accounting for the memory used by the streams | ||
unbounded_memory_pool: Arc<dyn MemoryPool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a clearer way to implement is let StreamingMergeBuilder
to include a new interface: with_bypass_mempool()
, and this will construct a temporary unbounded memory pool inside SPM, and let its memory reservation point to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added and created the temporary unbounded memory pool inside SPM.
made that function pub(super)
to avoid exposing this to the users as I feel it is a hack
|
||
async fn create_stream(mut self) -> Result<SendableRecordBatchStream> { | ||
loop { | ||
// Hold this for the lifetime of the stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we have this reservation with the same lifetime as the stream, would it be better to create a MultiLevelMergeStream
and make this reservation a struct field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed and made sure to use the memory reservation when only merging in-memory streams otherwise use the worst case scenario
} | ||
} | ||
|
||
fn create_sorted_stream( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great to include some comment for its high-level idea.
I also think maybe merge_sorted_runs_within_mem_limit()
can be a more precise name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added and renamed
// (reserving memory for the biggest batch in each stream) | ||
// This is a hack | ||
.with_reservation( | ||
MemoryConsumer::new("merge stream mock memory") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, this makes sense.
To enforce such validation, in the future we can extend StreamingMergeBuilder
with each stream's max batch size, and do some inner sanity checks:
let res = StreamingMergeBuilder::new()
.with_streams(streams)
.with_max_batch_size_per_stream(max_batch_sizes)
let Some(expressions) = expressions else { | ||
return internal_err!("Sort expressions cannot be empty for streaming merge"); | ||
}; | ||
|
||
if !sorted_spill_files.is_empty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's good to reduce the number of APIs, then two approaches seem to have similar complexity.
use arrow::downcast_primitive_array; | ||
use arrow_schema::DataType; | ||
|
||
/// TODO - NEED TO MOVE THIS TO ARROW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not obvious to me under what situation it can overestimate by a lot? I was thinking those batch arrays won't over allocate buffers too much, because we have a configured batch size.
Do you have a reproducer, perhaps we can look into it further.
/// # Errors | ||
/// - Returns an error if spilling would exceed the disk usage limit configured | ||
/// by `max_temp_directory_size` in `DiskManager` | ||
pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the SpillManager
mod is exported as pub(crate)
pub(crate) mod spill_manager; |
Then we’re free to modify the existing functions, and combine this one with spill_record_batch_by_size()
to reuse the code.
let mut offset = 0; | ||
let total_rows = batch.num_rows(); | ||
|
||
// Keep slicing the batch until we have left with a batch that is smaller than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have to do this step here?
I looked at its only use case:
.spill_record_batch_stream_by_size( |
The stream this function takes is produced by SPM, which has already chunk the output by
batch_size
in the configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, simplify
pub(crate) fn spill_record_batch_by_size_and_return_max_batch_memory( | ||
&self, | ||
batch: &RecordBatch, | ||
request_description: &str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe it might be better to unify the naming. Some functions use request_msg
, others use request_description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
can you please re-review I don't believe there are any actionable comments left for me |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds a multi-level merge sort implementation that prevents out-of-memory failures by intelligently managing memory during sorting operations. The implementation spills intermediate results to disk when memory is insufficient and dynamically adjusts buffer sizes and stream counts to work within available memory constraints.
- Implements
MultiLevelMergeBuilder
that performs hierarchical merging with automatic spilling - Adds memory-aware spill file management with worst-case memory reservation strategy
- Integrates the multi-level merge into both sort and aggregate operations
Reviewed Changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 7 comments.
Show a summary per file
File | Description |
---|---|
spill_manager.rs |
Adds methods to track maximum batch memory sizes during spilling operations |
streaming_merge.rs |
Integrates multi-level merge capabilities and adds SortedSpillFile structure |
sort.rs |
Updates external sorter to track batch memory sizes and use multi-level merge |
multi_level_merge.rs |
New core implementation of memory-aware multi-level merge algorithm |
row_hash.rs |
Updates aggregate spilling to use new spill file format with memory tracking |
Test files | Adds comprehensive fuzz tests for memory-constrained environments |
use arrow::datatypes::SchemaRef; | ||
use arrow::record_batch::RecordBatch; | ||
use datafusion_execution::runtime_env::RuntimeEnv; | ||
use std::sync::Arc; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The import order has been changed unnecessarily. The original import order (std imports first, then external crates) followed Rust conventions better.
use arrow::datatypes::SchemaRef; | |
use arrow::record_batch::RecordBatch; | |
use datafusion_execution::runtime_env::RuntimeEnv; | |
use std::sync::Arc; | |
use std::sync::Arc; | |
use arrow::datatypes::SchemaRef; | |
use arrow::record_batch::RecordBatch; | |
use datafusion_execution::runtime_env::RuntimeEnv; |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't the import order defined in clippy/fmt?
/// | ||
/// This is not marked as `pub` because it is not recommended to use this method |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment should explain why this method is not recommended and what the risks are of bypassing the memory pool.
/// | |
/// This is not marked as `pub` because it is not recommended to use this method | |
/// | |
/// This method bypasses the memory pool, which can lead to unregulated memory usage. | |
/// Using an unbounded memory pool may result in excessive memory consumption and | |
/// potential system instability if memory usage exceeds available resources. | |
/// This is not marked as `pub` because it is not recommended to use this method | |
/// except in testing or controlled scenarios where memory usage is closely monitored. |
Copilot uses AI. Check for mistakes.
// TODO - add a threshold for number of files to disk even if empty and reading from disk so | ||
// we can avoid the memory reservation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO comment is incomplete and unclear. It should be completed to explain the specific optimization being considered.
// TODO - add a threshold for number of files to disk even if empty and reading from disk so | |
// we can avoid the memory reservation | |
// TODO - Consider adding a threshold for the number of files to spill to disk, even if the files are empty | |
// and we are reading from disk. This optimization aims to reduce memory reservation by limiting | |
// the number of in-memory streams. The threshold could be based on factors such as available memory, | |
// the size of the sorted spill files, or the expected performance impact of disk I/O. Further analysis | |
// and testing are needed to determine the appropriate threshold value and its implementation. |
Copilot uses AI. Check for mistakes.
// If we're only merging memory streams, we don't need to attach the memory reservation | ||
// as it's empty | ||
if is_only_merging_memory_streams { | ||
assert_eq!(memory_reservation.size(), 0, "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assert_eq!
in production code can cause panics. Consider returning an error result or using a debug assertion instead.
assert_eq!(memory_reservation.size(), 0, "when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory"); | |
if memory_reservation.size() != 0 { | |
return Err(datafusion_common::DataFusionError::Internal( | |
"when only merging memory streams, we should not have any memory reservation and let the merge sort handle the memory".to_string(), | |
)); | |
} |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assertion is on purpose to ensure correctness
minimum_number_of_required_streams: usize, | ||
reservation: &mut MemoryReservation, | ||
) -> Result<(Vec<SortedSpillFile>, usize)> { | ||
assert_ne!(buffer_len, 0, "Buffer length must be greater than 0"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using assert_ne!
in production code can cause panics. Consider returning an error result for invalid input parameters instead.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the purpose I want to return panic as this is correctness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks again @rluvaton
I have filed the issues for the follow-up tasks.
I plan to wait a few days before merging, in case others would like to review it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot! It looks good to me.
Thanks everyone @rluvaton @ding-young and others who have helped along the way 🚀 |
Amazing work! Aside from updating to |
Nothing else needed, it will be triggered automatically and it doesn't require a configuration to control. |
EPIC! I will also add this to the list of things we should mention in the 50.0.0 release also |
/// - No: return that sorted stream as the final output stream | ||
/// | ||
/// ```text | ||
/// Initial State: Multiple sorted streams + spill files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😍
…he#15700) * feat: add multi level merge sort that will always fit in memory * test: add fuzz test for aggregate * update * add more tests * fix test * update tests * added more aggregate fuzz * align with add fuzz tests * add sort fuzz * fix lints and formatting * moved spill in memory constrained envs to separate test * rename `StreamExec` to `OnceExec` * added comment on the usize in the `in_progress_spill_file` inside ExternalSorter * rename buffer_size to buffer_len * reuse code in spill fuzz * double the amount of memory needed to sort * add diagram for explaining the overview * update based on code review * fix test based on new memory calculation * remove get_size in favor of get_sliced_size * change to result
I'm trying this out for our compaction system and am not able to get my sort to work without hitting memory limits. Note that I am using In -- About 6.32 GB of parquet compressed (~ 10 x compression ratio)
-- Split into ~60 ~100 MB files
CREATE EXTERNAL TABLE t1
STORED AS PARQUET
LOCATION '/Users/adriangb/Downloads/data/day=2025-08-05/';
explain
COPY (
SELECT *
FROM t1
ORDER BY deployment_environment, kind, service_name, trace_id
)
TO '/Users/adriangb/Downloads/out.parquet';
COPY (
SELECT *
FROM t1
ORDER BY deployment_environment, kind, service_name, trace_id
)
TO '/Users/adriangb/Downloads/out.parquet'; Even a limit of 16GB fails: ❯ ./target/release/datafusion-cli --mem-pool-type 'fair' --memory-limit '16g' --disk-limit '250gb' -f q.sql
DataFusion CLI v49.0.0
0 row(s) fetched.
Elapsed 0.243 seconds.
+---------------+-------------------------------+
| plan_type | plan |
+---------------+-------------------------------+
| physical_plan | ┌───────────────────────────┐ |
| | │ DataSinkExec │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortPreservingMergeExec │ |
| | │ -------------------- │ |
| | │ deployment_environment ASC│ |
| | │ NULLS LAST, kind ASC │ |
| | │ NULLS LAST, │ |
| | │ service_name │ |
| | │ ASC NULLS LAST, │ |
| | │ trace_id ASC NULLS │ |
| | │ LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ SortExec │ |
| | │ -------------------- │ |
| | │ deployment_environment@35 │ |
| | │ ASC NULLS LAST, kind@6 │ |
| | │ ASC NULLS LAST, │ |
| | │ service_name@27 │ |
| | │ ASC NULLS LAST, │ |
| | │ trace_id@4 ASC │ |
| | │ NULLS LAST │ |
| | └─────────────┬─────────────┘ |
| | ┌─────────────┴─────────────┐ |
| | │ DataSourceExec │ |
| | │ -------------------- │ |
| | │ files: 68 │ |
| | │ format: parquet │ |
| | └───────────────────────────┘ |
| | |
+---------------+-------------------------------+
1 row(s) fetched.
Elapsed 0.255 seconds.
Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
ExternalSorter[4]#12(can spill: true) consumed 1089.2 MB,
ExternalSorter[11]#26(can spill: true) consumed 1018.9 MB,
ExternalSorter[8]#20(can spill: true) consumed 1004.6 MB.
Error: Failed to allocate additional 744.1 KB for ExternalSorterMerge[6] with 0.0 B already allocated for this reservation - 0.0 B remain available for the total pool I can maybe share the data with some sort of NDA but honestly it's not that interesting, it's just a lot of random data. |
@adriangb There are still some cases that external sorting gives up with memory allocation failure, especially when and (2) when memory pressure is so high that multi level merge step can't grow the reservation
We need to estimate the memory required for row-formatted batch correctly for (1) and further limit the number of spills to merge if multi level merge fails (#16908) for (2). I'm working on these follow up issues recently but it will take time. Anyway, until these fixes are done, I'd recommend trying to run above query with a smaller |
Which issue does this PR close?
Rationale for this change
We need merge sort that does not fail with out of memory
What changes are included in this PR?
Implemented multi level merge sort on top of
SortPreservingMergeStream
that spill intermediate result when not enough memory.How does it work:
When using the
MultiLevelMerge
you provide in memory streams and spill files,each spill file contain the memory size of the record batch with the largest memory consumption.
Why is this important?
SortPreservingMergeStream
usesBatchBuilder
which grow and shrink memory based on the record batches that it get. however if there is not enough memory it will just fail.this solution will reserve beforehand for each spill file the worst case scenerio for the record batch size so there will be no way that there is not enough memory mid sorting.
it will also try to reduce buffer size and number of streams to the minimum when there is not enough memory and will only fail if there is not enough memory for holding 2 record batches with no buffering in the stream
It can also be easily adjusted to allow for predefined maximum memory to merge stream
Are these changes tested?
yes added fuzz test for aggregate and sort
Are there any user-facing changes?
not really
Related to #15610