diff --git a/api/src/main/java/org/apache/iceberg/Scan.java b/api/src/main/java/org/apache/iceberg/Scan.java index 339bc75336ba..237cf524a365 100644 --- a/api/src/main/java/org/apache/iceberg/Scan.java +++ b/api/src/main/java/org/apache/iceberg/Scan.java @@ -120,6 +120,11 @@ default ThisT select(String... columns) { */ ThisT filter(Expression expr); + default ThisT removeUnusedDeletesWhenPlanning() { + throw new UnsupportedOperationException( + this.getClass().getName() + " doesn't implement planByPartition"); + } + /** * Returns this scan's filter {@link Expression}. * diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 618b2e95f29f..bb4ba533a69d 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -134,6 +134,10 @@ protected boolean shouldIgnoreResiduals() { return context().ignoreResiduals(); } + protected boolean shouldRemoveUnusedDeletesWhenPlanning() { + return context().removeUnusedDeletesWhenPlanning(); + } + protected Expression residualFilter() { return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter(); } @@ -149,6 +153,12 @@ protected ExecutorService planExecutor() { protected abstract ThisT newRefinedScan( Table newTable, Schema newSchema, TableScanContext newContext); + @Override + public ThisT removeUnusedDeletesWhenPlanning() { + return newRefinedScan( + table, tableSchema(), context.shouldRemoveUnusedDeletesWhenPlanning(true)); + } + @Override public ThisT option(String property, String value) { return newRefinedScan(table, schema, context.withOption(property, value)); diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java index 4d23dd525e07..0201404eba43 100644 --- a/core/src/main/java/org/apache/iceberg/DataTableScan.java +++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java @@ -79,6 +79,10 @@ public CloseableIterable doPlanFiles() { .ignoreDeleted() .columnsToKeepStats(columnsToKeepStats()); + if (shouldRemoveUnusedDeletesWhenPlanning()) { + manifestGroup = manifestGroup.planByPartition(); + } + if (shouldIgnoreResiduals()) { manifestGroup = manifestGroup.ignoreResiduals(); } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index 7716a7ba00c8..3e0903e3b4f1 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -26,8 +26,10 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -67,11 +69,11 @@ class DeleteFileIndex { private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0]; - private final EqualityDeletes globalDeletes; private final PartitionMap eqDeletesByPartition; private final PartitionMap posDeletesByPartition; private final Map posDeletesByPath; private final Map dvByPath; + private EqualityDeletes globalDeletes; private final boolean hasEqDeletes; private final boolean hasPosDeletes; private final boolean isEmpty; @@ -137,6 +139,51 @@ public Iterable referencedDeleteFiles() { return deleteFiles; } + void removeIndex(int specId, StructLike partition) { + if (partition.size() == 0) { + this.globalDeletes = null; + return; + } + + if (eqDeletesByPartition != null) { + eqDeletesByPartition.remove(specId, partition); + } + if (posDeletesByPartition != null) { + posDeletesByPartition.remove(specId, partition); + } + + if (posDeletesByPath != null) { + removeUnreferencedPathPositionDeletes(specId, partition); + } + + if (dvByPath != null) { + Set toRemove = Sets.newHashSet(); + for (Map.Entry deletes : dvByPath.entrySet()) { + DeleteFile deleteFile = deletes.getValue(); + if (specId == deleteFile.specId() && Objects.equals(partition, deleteFile.partition())) { + toRemove.add(deletes.getKey()); + break; + } + } + toRemove.forEach(dvByPath::remove); + } + } + + private void removeUnreferencedPathPositionDeletes(int specId, StructLike partition) { + Set toRemove = Sets.newHashSet(); + for (Map.Entry deletes : posDeletesByPath.entrySet()) { + Iterator deleteFiles = deletes.getValue().referencedDeleteFiles().iterator(); + if (deleteFiles.hasNext()) { + DeleteFile deleteFile = deleteFiles.next(); + if (specId == deleteFile.specId() && Objects.equals(partition, deleteFile.partition())) { + toRemove.add(deletes.getKey()); + break; + } + } + } + toRemove.forEach(posDeletesByPath::remove); + } + DeleteFile[] forEntry(ManifestEntry entry) { return forDataFile(entry.dataSequenceNumber(), entry.file()); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index a52a9c3e8a7f..29691b88c7a6 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -20,7 +20,9 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; +import java.io.Closeable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.Set; @@ -40,16 +42,18 @@ import org.apache.iceberg.metrics.ScanMetricsUtil; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ContentFileUtil; +import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.ParallelIterable; class ManifestGroup { private static final Types.StructType EMPTY_STRUCT = Types.StructType.of(); private final FileIO io; - private final Set dataManifests; + private Set dataManifests; private final DeleteFileIndex.Builder deleteIndexBuilder; private Predicate> manifestEntryPredicate; private Map specsById; @@ -64,6 +68,7 @@ class ManifestGroup { private Set columnsToKeepStats; private ExecutorService executorService; private ScanMetrics scanMetrics; + private DeleteFileIndex deleteFiles; ManifestGroup(FileIO io, Iterable manifests) { this( @@ -162,6 +167,34 @@ ManifestGroup planWith(ExecutorService newExecutorService) { return this; } + ManifestGroup planByPartition() { + Map, Integer> partitionRefCount = Maps.newHashMap(); + Map>> distinctPartitionsInManifest = + Maps.newHashMap(); + for (ManifestFile file : dataManifests) { + Set> visited = Sets.newHashSet(); + try (ManifestReader reader = ManifestFiles.read(file, io)) { + for (DataFile dataFile : reader) { + Pair partition = Pair.of(dataFile.specId(), dataFile.partition()); + if (visited.add(partition)) { + partitionRefCount.put(partition, partitionRefCount.getOrDefault(partition, 0) + 1); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + distinctPartitionsInManifest.put(file, visited); + } + + Set newDataFiles = Sets.newHashSet(); + for (ManifestFile file : dataManifests) { + newDataFiles.add( + new CloseableManifest(file, partitionRefCount, distinctPartitionsInManifest)); + } + this.dataManifests = newDataFiles; + return this; + } + /** * Returns an iterable of scan tasks. It is safe to add entries of this iterable to a collection * as {@link DataFile} in each {@link FileScanTask} is defensively copied. @@ -172,6 +205,144 @@ public CloseableIterable planFiles() { return plan(ManifestGroup::createFileScanTasks); } + private class CloseableManifest implements ManifestFile, Closeable { + private final ManifestFile delegate; + private final Map, Integer> partitionRefCount; + private final Map>> distinctPartitionsInManifest; + + private CloseableManifest( + ManifestFile delegate, + Map, Integer> partitionRefCount, + Map>> distinctPartitionsInManifest) { + this.delegate = delegate; + this.partitionRefCount = partitionRefCount; + this.distinctPartitionsInManifest = distinctPartitionsInManifest; + } + + @Override + public void close() { + synchronized (partitionRefCount) { + Set> pairs = distinctPartitionsInManifest.get(delegate); + for (Pair partition : pairs) { + partitionRefCount.put(partition, partitionRefCount.get(partition) - 1); + if (partitionRefCount.get(partition) == 0) { + deleteFiles.removeIndex(partition.first(), partition.second()); + } + } + } + } + + @Override + public boolean hasAddedFiles() { + return delegate.hasAddedFiles(); + } + + @Override + public boolean hasExistingFiles() { + return delegate.hasExistingFiles(); + } + + @Override + public boolean hasDeletedFiles() { + return delegate.hasDeletedFiles(); + } + + @Override + public ByteBuffer keyMetadata() { + return delegate.keyMetadata(); + } + + @Override + public Long firstRowId() { + return delegate.firstRowId(); + } + + @Override + public String path() { + return delegate.path(); + } + + @Override + public long length() { + return delegate.length(); + } + + @Override + public int partitionSpecId() { + return delegate.partitionSpecId(); + } + + @Override + public ManifestContent content() { + return delegate.content(); + } + + @Override + public long sequenceNumber() { + return delegate.sequenceNumber(); + } + + @Override + public long minSequenceNumber() { + return delegate.minSequenceNumber(); + } + + @Override + public Long snapshotId() { + return delegate.snapshotId(); + } + + @Override + public Integer addedFilesCount() { + return delegate.addedFilesCount(); + } + + @Override + public Long addedRowsCount() { + return delegate.addedRowsCount(); + } + + @Override + public Integer existingFilesCount() { + return delegate.existingFilesCount(); + } + + @Override + public Long existingRowsCount() { + return delegate.existingRowsCount(); + } + + @Override + public Integer deletedFilesCount() { + return delegate.deletedFilesCount(); + } + + @Override + public Long deletedRowsCount() { + return delegate.deletedRowsCount(); + } + + @Override + public List partitions() { + return delegate.partitions(); + } + + @Override + public ManifestFile copy() { + return delegate.copy(); + } + + @Override + public int hashCode() { + return delegate.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return delegate.equals(obj); + } + } + public CloseableIterable plan(CreateTasksFunction createTasksFunc) { LoadingCache residualCache = Caffeine.newBuilder() @@ -182,7 +353,7 @@ public CloseableIterable plan(CreateTasksFunction cre return ResidualEvaluator.of(spec, filter, caseSensitive); }); - DeleteFileIndex deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build(); + deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build(); boolean dropStats = ManifestReader.dropStats(columns); if (deleteFiles.hasEqualityDeletes()) { @@ -352,6 +523,9 @@ public void close() throws IOException { if (iterable != null) { iterable.close(); } + if (manifest instanceof CloseableManifest) { + ((CloseableManifest) manifest).close(); + } } }); } diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java b/core/src/main/java/org/apache/iceberg/TableScanContext.java index 5722ed7d8c1c..ba5e6d468d0e 100644 --- a/core/src/main/java/org/apache/iceberg/TableScanContext.java +++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java @@ -60,6 +60,11 @@ public boolean returnColumnStats() { return false; } + @Value.Default + public boolean removeUnusedDeletesWhenPlanning() { + return false; + } + @Nullable public abstract Set columnsToKeepStats(); @@ -129,6 +134,13 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) { .build(); } + TableScanContext shouldRemoveUnusedDeletesWhenPlanning(boolean removeUnusedDeletesWhenPlanning) { + return ImmutableTableScanContext.builder() + .from(this) + .removeUnusedDeletesWhenPlanning(removeUnusedDeletesWhenPlanning) + .build(); + } + TableScanContext columnsToKeepStats(Set columnsToKeepStats) { Preconditions.checkState( returnColumnStats(),