-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Spill improvements #25892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spill improvements #25892
Conversation
TestHashAggregationOperator#testHashAggregation() runtime reduced from 2m45s to 4s.
- SpillableHashAggregationBuilder#buildResult is now spilling asynchronously - Revocable memory is not converted to user memory when !shouldMergeWithMemory. This was causing OOM issues previously.
cc @osscm |
core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % comments on metrics commit, we can take that to separate PR if you want to merge the remaining first
core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/operator/SpillMetrics.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm % #25892 (comment)
core/trino-main/src/main/java/io/trino/operator/MergeHashSort.java
Outdated
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/spiller/FileSingleStreamSpiller.java
Outdated
Show resolved
Hide resolved
// TODO: this should be asynchronous | ||
getFutureValue(spillToDisk()); | ||
updateMemory(); | ||
return flatten(WorkProcessor.create(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of flatten
and create
here makes this code a little difficult to follow since most of the logic only needs to be called once. Could this be refactored to a transform that runs after the spillToDisk()
unblocks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it would make it much simpler. Think of flatten
processor as a control-loop that delegates to other actions.
We could maybe add more WorkProcessor
constructs, e.g:
WorkProcessor.of()
.blockingOptionally(() -> {
checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished");
updateMemory();
if (localRevocableMemoryContext.getBytes() > 0) {
// No spill happened, try to build result from memory
if (spiller.isEmpty()) {
// No spill happened, try to build result from memory. Revocable memory needs to be converted to user memory as producing output stage is no longer revocable.
long currentRevocableBytes = localRevocableMemoryContext.getBytes();
localRevocableMemoryContext.setBytes(0);
if (!localUserMemoryContext.trySetBytes(localUserMemoryContext.getBytes() + currentRevocableBytes)) {
// TODO: this might fail (even though we have just released memory), but we don't
// have a proper way to atomically convert memory reservations
localRevocableMemoryContext.setBytes(currentRevocableBytes);
// spill since revocable memory could not be converted to user memory immediately
return Optional.of(spillToDisk());
}
}
else if (!shouldMergeWithMemory(getSizeInMemoryWhenUnspilling())) {
return Optional.of(spillToDisk());
}
}
return Optional.empty();
})
.flatReturn(() -> {
checkState(hasPreviousSpillCompletedSuccessfully(), "Previous spill hasn't yet finished");
// update memory after potential spill from the previous call to buildResult
updateMemory();
if (spiller.isEmpty()) {
return hashAggregationBuilder.buildResult();
}
return mergeFromDiskAndMemory();
})
it's probably easier to read, but I would skip it for now
Previously groupByHash.getRawHash was used when spilling and InterpretedHashGenerator was used when unspillin. This could lead to correctness issues. With this fix, raw hash is appended to spilled pages and used when unspilling, preventing invalid page breaks.
Previously spilled data size was comptued at query level by summarizing stats from operators. However, operator stats are unavailable while operator is running. Therefore spilled stats were not real time stats. This change makes spilled stats as first class citizen and reported in real time.
This allows to saturate spill disks better in case of multiple spill locations.
FileSingleStreamSpiller fields can be accessed from multiple threads, therefore they need to be thread safe.
Spilling metrics for: - number of spills - spill wall time - spilled data - number of unspills - unspill wall time - unspilled data
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: