Skip to content

Commit c715bb8

Browse files
Fix memory leak in Iceberg remove orphan files procedure
1 parent a0af241 commit c715bb8

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@
149149
import org.apache.iceberg.ManifestFile;
150150
import org.apache.iceberg.ManifestFiles;
151151
import org.apache.iceberg.ManifestReader;
152+
import org.apache.iceberg.ManifestUtils;
152153
import org.apache.iceberg.MetadataColumns;
153154
import org.apache.iceberg.PartitionField;
154155
import org.apache.iceberg.PartitionSpec;
@@ -206,10 +207,12 @@
206207
import java.util.HashSet;
207208
import java.util.Iterator;
208209
import java.util.LinkedHashMap;
210+
import java.util.LinkedList;
209211
import java.util.List;
210212
import java.util.Map;
211213
import java.util.Optional;
212214
import java.util.OptionalLong;
215+
import java.util.Queue;
213216
import java.util.Set;
214217
import java.util.concurrent.Callable;
215218
import java.util.concurrent.ConcurrentHashMap;
@@ -238,6 +241,7 @@
238241
import static com.google.common.collect.ImmutableSet.toImmutableSet;
239242
import static com.google.common.collect.Iterables.getLast;
240243
import static com.google.common.collect.Iterables.getOnlyElement;
244+
import static com.google.common.collect.Lists.newArrayList;
241245
import static com.google.common.collect.Maps.transformValues;
242246
import static com.google.common.collect.Sets.difference;
243247
import static io.trino.filesystem.Locations.isS3Tables;
@@ -2251,12 +2255,13 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
22512255
ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder();
22522256
ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();
22532257

2254-
for (Snapshot snapshot : table.snapshots()) {
2255-
if (snapshot.manifestListLocation() != null) {
2256-
validMetadataFileNames.add(fileName(snapshot.manifestListLocation()));
2258+
ArrayList<String> manifestListLocations = newArrayList(Iterables.transform(table.snapshots(), Snapshot::manifestListLocation));
2259+
for (String manifestListLocation : manifestListLocations) {
2260+
if (manifestListLocation != null) {
2261+
validMetadataFileNames.add(fileName(manifestListLocation));
22572262
}
22582263

2259-
for (ManifestFile manifest : loadAllManifestsFromSnapshot(table, snapshot)) {
2264+
for (ManifestFile manifest : loadAllManifestsFromManifestList(table, manifestListLocation)) {
22602265
if (!processedManifestFilePaths.add(manifest.path())) {
22612266
// Already read this manifest
22622267
continue;
@@ -3460,6 +3465,21 @@ private static List<ManifestFile> loadAllManifestsFromSnapshot(Table icebergTabl
34603465
}
34613466
}
34623467

3468+
/**
3469+
* Use instead of loadAllManifestsFromSnapshot when loading manifests from multiple distinct snapshots
3470+
* Each BaseSnapshot object caches manifest files separately, so loading manifests from multiple distinct snapshots
3471+
* results in O(num_snapshots^2) copies of the same manifest file metadata in memory
3472+
*/
3473+
private static List<ManifestFile> loadAllManifestsFromManifestList(Table icebergTable, String manifestListLocation)
3474+
{
3475+
try {
3476+
return ManifestUtils.read(icebergTable.io(), manifestListLocation);
3477+
}
3478+
catch (NotFoundException | UncheckedIOException e) {
3479+
throw new TrinoException(ICEBERG_INVALID_METADATA, "Error accessing manifest file for table %s".formatted(icebergTable.name()), e);
3480+
}
3481+
}
3482+
34633483
private static Set<Integer> identityPartitionColumnsInAllSpecs(Table table)
34643484
{
34653485
// Extract identity partition column source ids common to ALL specs
@@ -3736,7 +3756,7 @@ public Optional<ConnectorOutputMetadata> finishRefreshMaterializedView(
37363756
icebergTable.schema().findType(field.sourceId())))
37373757
.toArray(Type[]::new);
37383758

3739-
AppendFiles appendFiles = transaction.newFastAppend();
3759+
AppendFiles appendFiles = isMergeManifestsOnWrite(session) ? transaction.newAppend() : transaction.newFastAppend();
37403760
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
37413761
for (CommitTaskData task : commitTasks) {
37423762
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package org.apache.iceberg;
15+
16+
import org.apache.iceberg.io.FileIO;
17+
18+
import java.util.List;
19+
20+
public class ManifestUtils
21+
{
22+
private ManifestUtils() {}
23+
24+
public static List<ManifestFile> read(FileIO fileIO, String manifestListLocation)
25+
{
26+
// Avoid using snapshot.allManifests() which can easily leak memory
27+
return ManifestLists.read(fileIO.newInputFile(manifestListLocation));
28+
}
29+
}

0 commit comments

Comments
 (0)