Skip to content

Commit a074935

Browse files
committed
Release hash aggregation memory on output
Incrementally releases memory from FlatGroupByHash when HashAggregationOperator starts producing output.
1 parent 5afc98a commit a074935

File tree

9 files changed

+169
-8
lines changed

9 files changed

+169
-8
lines changed

core/trino-main/src/main/java/io/trino/operator/AppendOnlyVariableWidthData.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,27 @@ public byte[] getChunk(byte[] pointer, int pointerOffset)
138138
return chunks.get(chunkIndex);
139139
}
140140

141+
public void freeChunksBefore(byte[] pointer, int pointerOffset)
142+
{
143+
int chunkIndex = getChunkIndex(pointer, pointerOffset);
144+
if (chunks.isEmpty()) {
145+
verify(chunkIndex == 0);
146+
return;
147+
}
148+
checkIndex(chunkIndex, chunks.size());
149+
// Release any previous chunks until a null chunk is encountered, which means it and any previous
150+
// batches have already been released
151+
int releaseIndex = chunkIndex - 1;
152+
while (releaseIndex >= 0) {
153+
byte[] releaseChunk = chunks.set(releaseIndex, null);
154+
if (releaseChunk == null) {
155+
break;
156+
}
157+
chunksRetainedSizeInBytes -= releaseChunk.length;
158+
releaseIndex--;
159+
}
160+
}
161+
141162
// growth factor for each chunk doubles up to 512KB, then increases by 1.5x for each chunk after that
142163
private static long nextChunkSize(long previousChunkSize)
143164
{

core/trino-main/src/main/java/io/trino/operator/BigintGroupByHash.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ public int getGroupCount()
126126
return nextGroupId;
127127
}
128128

129+
@Override
130+
public void startReleasingOutput()
131+
{
132+
// NOOP
133+
}
134+
129135
@Override
130136
public void appendValuesTo(int groupId, PageBuilder pageBuilder)
131137
{

core/trino-main/src/main/java/io/trino/operator/FlatGroupByHash.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@ public int getGroupCount()
129129
return flatHash.size();
130130
}
131131

132+
@Override
133+
public void startReleasingOutput()
134+
{
135+
flatHash.startReleasingOutput();
136+
}
137+
132138
@Override
133139
public void appendValuesTo(int groupId, PageBuilder pageBuilder)
134140
{

core/trino-main/src/main/java/io/trino/operator/FlatHash.java

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ public FlatHash(FlatHash other)
124124
this.mask = other.mask;
125125
this.nextGroupId = other.nextGroupId;
126126
this.maxFill = other.maxFill;
127-
this.control = Arrays.copyOf(other.control, other.control.length);
128-
this.groupIdsByHash = Arrays.copyOf(other.groupIdsByHash, other.groupIdsByHash.length);
127+
this.control = other.control == null ? null : Arrays.copyOf(other.control, other.control.length);
128+
this.groupIdsByHash = other.groupIdsByHash == null ? null : Arrays.copyOf(other.groupIdsByHash, other.groupIdsByHash.length);
129129
this.fixedSizeRecords = Arrays.stream(other.fixedSizeRecords)
130130
.map(fixedSizeRecords -> fixedSizeRecords == null ? null : Arrays.copyOf(fixedSizeRecords, fixedSizeRecords.length))
131131
.toArray(byte[][]::new);
@@ -153,13 +153,36 @@ public int getCapacity()
153153
return capacity;
154154
}
155155

156+
/**
157+
* Releases memory associated with the hash table which is no longer necessary to produce output. Subsequent
158+
* calls to insert new elements are rejected, and calls to {@link FlatHash#appendTo(int, BlockBuilder[])} will
159+
* incrementally release memory associated with prior groupId values assuming that the caller will only call into
160+
* the method to produce output in a sequential fashion.
161+
*/
162+
public void startReleasingOutput()
163+
{
164+
if (isReleasingOutput()) {
165+
throw new IllegalStateException("already releasing output");
166+
}
167+
control = null;
168+
groupIdsByHash = null;
169+
}
170+
171+
private boolean isReleasingOutput()
172+
{
173+
return control == null;
174+
}
175+
156176
public long hashPosition(int groupId)
157177
{
158-
if (groupId < 0) {
159-
throw new IllegalArgumentException("groupId is negative");
178+
if (groupId < 0 || groupId >= nextGroupId) {
179+
throw new IllegalArgumentException("groupId out of range: " + groupId);
160180
}
161181
byte[] fixedSizeRecords = getFixedSizeRecords(groupId);
162182
int fixedRecordOffset = getFixedRecordOffset(groupId);
183+
if (isReleasingOutput() && fixedSizeRecords == null) {
184+
throw new IllegalStateException("groupId already released");
185+
}
163186
if (cacheHashValue) {
164187
return (long) LONG_HANDLE.get(fixedSizeRecords, fixedRecordOffset);
165188
}
@@ -182,7 +205,8 @@ public void appendTo(int groupId, BlockBuilder[] blockBuilders)
182205
{
183206
checkArgument(groupId < nextGroupId, "groupId out of range");
184207

185-
byte[] fixedSizeRecords = getFixedSizeRecords(groupId);
208+
int recordGroupIndex = recordGroupIndexForGroupId(groupId);
209+
byte[] fixedSizeRecords = this.fixedSizeRecords[recordGroupIndex];
186210
int recordOffset = getFixedRecordOffset(groupId);
187211

188212
byte[] variableWidthChunk = null;
@@ -202,6 +226,18 @@ public void appendTo(int groupId, BlockBuilder[] blockBuilders)
202226
if (hasPrecomputedHash) {
203227
BIGINT.writeLong(blockBuilders[blockBuilders.length - 1], (long) LONG_HANDLE.get(fixedSizeRecords, recordOffset));
204228
}
229+
// Release memory from the previous fixed size records batch
230+
if (isReleasingOutput() && recordOffset == 0 && recordGroupIndex > 0) {
231+
byte[] releasedRecords = this.fixedSizeRecords[recordGroupIndex - 1];
232+
this.fixedSizeRecords[recordGroupIndex - 1] = null;
233+
if (releasedRecords == null) {
234+
throw new IllegalStateException("already released previous record batch");
235+
}
236+
fixedRecordGroupsRetainedSize -= sizeOf(releasedRecords);
237+
if (variableWidthData != null) {
238+
variableWidthData.freeChunksBefore(fixedSizeRecords, recordOffset + variableWidthOffset);
239+
}
240+
}
205241
}
206242

207243
public void computeHashes(Block[] blocks, long[] hashes, int offset, int length)
@@ -251,6 +287,9 @@ public int putIfAbsent(Block[] blocks, int position, long hash)
251287

252288
private int getIndex(Block[] blocks, int position, long hash)
253289
{
290+
if (isReleasingOutput()) {
291+
throw new IllegalStateException("already releasing output");
292+
}
254293
byte hashPrefix = (byte) (hash & 0x7F | 0x80);
255294
int bucket = bucket((int) (hash >> 7));
256295

@@ -351,6 +390,9 @@ private void setControl(int index, byte hashPrefix)
351390

352391
public boolean ensureAvailableCapacity(int batchSize)
353392
{
393+
if (isReleasingOutput()) {
394+
throw new IllegalStateException("already releasing output");
395+
}
354396
long requiredMaxFill = nextGroupId + batchSize;
355397
if (requiredMaxFill >= maxFill) {
356398
long minimumRequiredCapacity = (requiredMaxFill + 1) * 16 / 15;

core/trino-main/src/main/java/io/trino/operator/GroupByHash.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ static GroupByHash createGroupByHash(
109109

110110
void appendValuesTo(int groupId, PageBuilder pageBuilder);
111111

112+
/**
113+
* Signals that no more entries will be inserted, and that only calls to {@link GroupByHash#appendValuesTo(int, PageBuilder)}
114+
* with sequential groupId values will be observed after this point, allowing the implementation to potentially
115+
* release memory associated with structures required for inserts or associated with values that have already been
116+
* output.
117+
*/
118+
void startReleasingOutput();
119+
112120
Work<?> addPage(Page page);
113121

114122
/**

core/trino-main/src/main/java/io/trino/operator/NoChannelGroupByHash.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ public void appendValuesTo(int groupId, PageBuilder pageBuilder)
5050
throw new UnsupportedOperationException("NoChannelGroupByHash does not support appendValuesTo");
5151
}
5252

53+
@Override
54+
public void startReleasingOutput()
55+
{
56+
throw new UnsupportedOperationException("NoChannelGroupByHash does not support startReleasingOutput");
57+
}
58+
5359
@Override
5460
public Work<?> addPage(Page page)
5561
{

core/trino-main/src/main/java/io/trino/operator/aggregation/builder/InMemoryHashAggregationBuilder.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,18 @@ public WorkProcessor<Page> buildResult()
248248
for (GroupedAggregator groupedAggregator : groupedAggregators) {
249249
groupedAggregator.prepareFinal();
250250
}
251-
return buildResult(consecutiveGroupIds(), new PageBuilder(buildTypes()), false);
251+
// Only incrementally release memory for final aggregations, since partial aggregations have a fixed
252+
// memory limit and can be expected to fully flush and release their output quickly
253+
boolean releaseMemoryOnOutput = !partial;
254+
if (releaseMemoryOnOutput) {
255+
groupByHash.startReleasingOutput();
256+
}
257+
return buildResult(consecutiveGroupIds(), new PageBuilder(buildTypes()), false, releaseMemoryOnOutput);
252258
}
253259

254260
public WorkProcessor<Page> buildSpillResult()
255261
{
256-
return buildResult(hashSortedGroupIds(), new PageBuilder(buildSpillTypes()), true);
262+
return buildResult(hashSortedGroupIds(), new PageBuilder(buildSpillTypes()), true, false);
257263
}
258264

259265
public List<Type> buildSpillTypes()
@@ -273,7 +279,7 @@ public int getCapacity()
273279
return groupByHash.getCapacity();
274280
}
275281

276-
private WorkProcessor<Page> buildResult(IntIterator groupIds, PageBuilder pageBuilder, boolean appendRawHash)
282+
private WorkProcessor<Page> buildResult(IntIterator groupIds, PageBuilder pageBuilder, boolean appendRawHash, boolean releaseMemoryOnOutput)
277283
{
278284
int rawHashIndex = groupByChannels.length + groupedAggregators.size();
279285
return WorkProcessor.create(() -> {
@@ -300,6 +306,11 @@ private WorkProcessor<Page> buildResult(IntIterator groupIds, PageBuilder pageBu
300306
}
301307
}
302308

309+
// Update memory usage after producing each page of output
310+
if (releaseMemoryOnOutput) {
311+
updateMemory();
312+
}
313+
303314
return ProcessState.ofResult(pageBuilder.build());
304315
});
305316
}

core/trino-main/src/test/java/io/trino/operator/CyclingGroupByHash.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ public int getGroupCount()
5454
return maxGroupId + 1;
5555
}
5656

57+
@Override
58+
public void startReleasingOutput()
59+
{
60+
throw new UnsupportedOperationException("Not yet supported");
61+
}
62+
5763
@Override
5864
public void appendValuesTo(int groupId, PageBuilder pageBuilder)
5965
{

core/trino-main/src/test/java/io/trino/operator/TestGroupByHash.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import static io.trino.spi.type.VarcharType.VARCHAR;
4747
import static io.trino.type.TypeTestUtils.getHashBlock;
4848
import static org.assertj.core.api.Assertions.assertThat;
49+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4950

5051
public class TestGroupByHash
5152
{
@@ -333,6 +334,60 @@ public void testUpdateMemoryBigint()
333334
assertThat(rehashCount.get()).isEqualTo(2 * BIGINT_EXPECTED_REHASH);
334335
}
335336

337+
@Test
338+
public void testReleaseMemoryOnOutput()
339+
{
340+
Type type = VARCHAR;
341+
// values expands into multiple FlatGroupByHash fixed record groups
342+
Block valuesBlock = createStringSequenceBlock(0, 1_000_000);
343+
344+
GroupByHash groupByHash = createGroupByHash(ImmutableList.of(type), selectGroupByHashMode(false, false, ImmutableList.of(type)), 10_000, false, new FlatHashStrategyCompiler(new TypeOperators()), () -> true);
345+
assertThat(groupByHash.addPage(new Page(valuesBlock)).process()).isTrue();
346+
assertThat(groupByHash.getGroupCount()).isEqualTo(valuesBlock.getPositionCount());
347+
348+
long memoryUsageAfterInput = groupByHash.getEstimatedSize();
349+
groupByHash.startReleasingOutput();
350+
// memory usage should have decreased from dropping the hash table
351+
long memoryUsageAfterReleasingOutput = groupByHash.getEstimatedSize();
352+
// single immediate release of memory for the control and groupId by hash values
353+
assertThat(memoryUsageAfterReleasingOutput).isLessThan(memoryUsageAfterInput);
354+
355+
// no more inputs accepted after switching to releasing output
356+
assertThatThrownBy(() -> groupByHash.addPage(new Page(valuesBlock)).process())
357+
.isInstanceOf(IllegalStateException.class)
358+
.hasMessage("already releasing output");
359+
assertThatThrownBy(() -> groupByHash.startReleasingOutput())
360+
.isInstanceOf(IllegalStateException.class)
361+
.hasMessage("already releasing output");
362+
363+
PageBuilder pageBuilder = new PageBuilder(ImmutableList.of(type));
364+
int groupId = 0;
365+
// FlatGroupByHash first 1024 records are within the first record group
366+
for (; groupId < 1024; groupId++) {
367+
groupByHash.appendValuesTo(groupId, pageBuilder);
368+
pageBuilder.declarePosition();
369+
}
370+
pageBuilder.build();
371+
// No memory released yet after completing the first group
372+
assertThat(groupByHash.getEstimatedSize()).isEqualTo(memoryUsageAfterReleasingOutput);
373+
374+
groupByHash.appendValuesTo(groupId++, pageBuilder);
375+
pageBuilder.declarePosition();
376+
// Memory released
377+
long memoryUsageAfterFirstRelease = groupByHash.getEstimatedSize();
378+
assertThat(memoryUsageAfterFirstRelease).isLessThan(memoryUsageAfterReleasingOutput);
379+
assertThatThrownBy(() -> groupByHash.getRawHash(0))
380+
.isInstanceOf(IllegalStateException.class)
381+
.hasMessage("groupId already released");
382+
383+
for (; groupId < valuesBlock.getPositionCount(); groupId++) {
384+
groupByHash.appendValuesTo(groupId, pageBuilder);
385+
pageBuilder.declarePosition();
386+
}
387+
// More memory released
388+
assertThat(groupByHash.getEstimatedSize()).isLessThan(memoryUsageAfterFirstRelease);
389+
}
390+
336391
@Test
337392
public void testMemoryReservationYield()
338393
{

0 commit comments

Comments
 (0)