|
206 | 206 | import java.util.HashSet;
|
207 | 207 | import java.util.Iterator;
|
208 | 208 | import java.util.LinkedHashMap;
|
| 209 | +import java.util.LinkedList; |
209 | 210 | import java.util.List;
|
210 | 211 | import java.util.Map;
|
211 | 212 | import java.util.Optional;
|
212 | 213 | import java.util.OptionalLong;
|
| 214 | +import java.util.Queue; |
213 | 215 | import java.util.Set;
|
214 | 216 | import java.util.concurrent.Callable;
|
215 | 217 | import java.util.concurrent.ConcurrentHashMap;
|
@@ -2251,7 +2253,9 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl
|
2251 | 2253 | ImmutableSet.Builder<String> validMetadataFileNames = ImmutableSet.builder();
|
2252 | 2254 | ImmutableSet.Builder<String> validDataFileNames = ImmutableSet.builder();
|
2253 | 2255 |
|
2254 |
| - for (Snapshot snapshot : table.snapshots()) { |
| 2256 | + Queue<Snapshot> allSnapshots = getAllSnapshots(table); |
| 2257 | + while (!allSnapshots.isEmpty()) { |
| 2258 | + Snapshot snapshot = allSnapshots.poll(); |
2255 | 2259 | if (snapshot.manifestListLocation() != null) {
|
2256 | 2260 | validMetadataFileNames.add(fileName(snapshot.manifestListLocation()));
|
2257 | 2261 | }
|
@@ -4179,4 +4183,18 @@ private static TableStatistics mergeColumnStatistics(TableStatistics currentStat
|
4179 | 4183 | newStats.getColumnStatistics().forEach(statisticsBuilder::setColumnStatistics);
|
4180 | 4184 | return statisticsBuilder.build();
|
4181 | 4185 | }
|
| 4186 | + |
| 4187 | + /** |
| 4188 | + * Table.snapshots() returns Iterable but the actual implementation returned is a List |
| 4189 | + * snapshot.allManifests() caches manifests, so by keeping a List around GC is prevented from collecting any in memory manifests which leaks memory |
| 4190 | + * Workaround: use Queue, Queue#poll will unlink old Snapshots allowing for GC during traversal |
| 4191 | + */ |
| 4192 | + private static Queue<Snapshot> getAllSnapshots(Table icebergTable) |
| 4193 | + { |
| 4194 | + Queue<Snapshot> allSnapshots = new LinkedList<>(); |
| 4195 | + for (Snapshot snapshot : icebergTable.snapshots()) { |
| 4196 | + allSnapshots.add(snapshot); |
| 4197 | + } |
| 4198 | + return allSnapshots; |
| 4199 | + } |
4182 | 4200 | }
|
0 commit comments