Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ default ThisT select(String... columns) {
*/
ThisT filter(Expression expr);

default ThisT removeUnusedDeletesWhenPlanning() {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement planByPartition");
}

/**
* Returns this scan's filter {@link Expression}.
*
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}

protected boolean shouldRemoveUnusedDeletesWhenPlanning() {
return context().removeUnusedDeletesWhenPlanning();
}

protected Expression residualFilter() {
return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
}
Expand All @@ -149,6 +153,12 @@ protected ExecutorService planExecutor() {
protected abstract ThisT newRefinedScan(
Table newTable, Schema newSchema, TableScanContext newContext);

@Override
public ThisT removeUnusedDeletesWhenPlanning() {
return newRefinedScan(
table, tableSchema(), context.shouldRemoveUnusedDeletesWhenPlanning(true));
}

@Override
public ThisT option(String property, String value) {
return newRefinedScan(table, schema, context.withOption(property, value));
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ public CloseableIterable<FileScanTask> doPlanFiles() {
.ignoreDeleted()
.columnsToKeepStats(columnsToKeepStats());

if (shouldRemoveUnusedDeletesWhenPlanning()) {
manifestGroup = manifestGroup.planByPartition();
}

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}
Expand Down
49 changes: 48 additions & 1 deletion core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -67,11 +69,11 @@
class DeleteFileIndex {
private static final DeleteFile[] EMPTY_DELETES = new DeleteFile[0];

private final EqualityDeletes globalDeletes;
private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
private final PartitionMap<PositionDeletes> posDeletesByPartition;
private final Map<String, PositionDeletes> posDeletesByPath;
private final Map<String, DeleteFile> dvByPath;
private EqualityDeletes globalDeletes;
private final boolean hasEqDeletes;
private final boolean hasPosDeletes;
private final boolean isEmpty;
Expand Down Expand Up @@ -137,6 +139,51 @@ public Iterable<DeleteFile> referencedDeleteFiles() {
return deleteFiles;
}

void removeIndex(int specId, StructLike partition) {
if (partition.size() == 0) {
this.globalDeletes = null;
return;
}

if (eqDeletesByPartition != null) {
eqDeletesByPartition.remove(specId, partition);
}
if (posDeletesByPartition != null) {
posDeletesByPartition.remove(specId, partition);
}

if (posDeletesByPath != null) {
removeUnreferencedPathPositionDeletes(specId, partition);
}

if (dvByPath != null) {
Set<String> toRemove = Sets.newHashSet();
for (Map.Entry<String, DeleteFile> deletes : dvByPath.entrySet()) {
DeleteFile deleteFile = deletes.getValue();
if (specId == deleteFile.specId() && Objects.equals(partition, deleteFile.partition())) {
toRemove.add(deletes.getKey());
break;
}
}
toRemove.forEach(dvByPath::remove);
}
}

private void removeUnreferencedPathPositionDeletes(int specId, StructLike partition) {
Set<String> toRemove = Sets.newHashSet();
for (Map.Entry<String, PositionDeletes> deletes : posDeletesByPath.entrySet()) {
Iterator<DeleteFile> deleteFiles = deletes.getValue().referencedDeleteFiles().iterator();
if (deleteFiles.hasNext()) {
DeleteFile deleteFile = deleteFiles.next();
if (specId == deleteFile.specId() && Objects.equals(partition, deleteFile.partition())) {
toRemove.add(deletes.getKey());
break;
}
}
}
toRemove.forEach(posDeletesByPath::remove);
}

DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.dataSequenceNumber(), entry.file());
}
Expand Down
178 changes: 176 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -40,16 +42,18 @@
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ContentFileUtil;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ParallelIterable;

class ManifestGroup {
private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();

private final FileIO io;
private final Set<ManifestFile> dataManifests;
private Set<ManifestFile> dataManifests;
private final DeleteFileIndex.Builder deleteIndexBuilder;
private Predicate<ManifestEntry<DataFile>> manifestEntryPredicate;
private Map<Integer, PartitionSpec> specsById;
Expand All @@ -64,6 +68,7 @@ class ManifestGroup {
private Set<Integer> columnsToKeepStats;
private ExecutorService executorService;
private ScanMetrics scanMetrics;
private DeleteFileIndex deleteFiles;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(
Expand Down Expand Up @@ -162,6 +167,34 @@ ManifestGroup planWith(ExecutorService newExecutorService) {
return this;
}

ManifestGroup planByPartition() {
Map<Pair<Integer, StructLike>, Integer> partitionRefCount = Maps.newHashMap();
Map<ManifestFile, Set<Pair<Integer, StructLike>>> distinctPartitionsInManifest =
Maps.newHashMap();
for (ManifestFile file : dataManifests) {
Set<Pair<Integer, StructLike>> visited = Sets.newHashSet();
try (ManifestReader<DataFile> reader = ManifestFiles.read(file, io)) {
for (DataFile dataFile : reader) {
Pair<Integer, StructLike> partition = Pair.of(dataFile.specId(), dataFile.partition());
if (visited.add(partition)) {
partitionRefCount.put(partition, partitionRefCount.getOrDefault(partition, 0) + 1);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
distinctPartitionsInManifest.put(file, visited);
}

Set<ManifestFile> newDataFiles = Sets.newHashSet();
for (ManifestFile file : dataManifests) {
newDataFiles.add(
new CloseableManifest(file, partitionRefCount, distinctPartitionsInManifest));
}
this.dataManifests = newDataFiles;
return this;
}

/**
* Returns an iterable of scan tasks. It is safe to add entries of this iterable to a collection
* as {@link DataFile} in each {@link FileScanTask} is defensively copied.
Expand All @@ -172,6 +205,144 @@ public CloseableIterable<FileScanTask> planFiles() {
return plan(ManifestGroup::createFileScanTasks);
}

private class CloseableManifest implements ManifestFile, Closeable {
private final ManifestFile delegate;
private final Map<Pair<Integer, StructLike>, Integer> partitionRefCount;
private final Map<ManifestFile, Set<Pair<Integer, StructLike>>> distinctPartitionsInManifest;

private CloseableManifest(
ManifestFile delegate,
Map<Pair<Integer, StructLike>, Integer> partitionRefCount,
Map<ManifestFile, Set<Pair<Integer, StructLike>>> distinctPartitionsInManifest) {
this.delegate = delegate;
this.partitionRefCount = partitionRefCount;
this.distinctPartitionsInManifest = distinctPartitionsInManifest;
}

@Override
public void close() {
synchronized (partitionRefCount) {
Set<Pair<Integer, StructLike>> pairs = distinctPartitionsInManifest.get(delegate);
for (Pair<Integer, StructLike> partition : pairs) {
partitionRefCount.put(partition, partitionRefCount.get(partition) - 1);
if (partitionRefCount.get(partition) == 0) {
deleteFiles.removeIndex(partition.first(), partition.second());
}
}
}
}

@Override
public boolean hasAddedFiles() {
return delegate.hasAddedFiles();
}

@Override
public boolean hasExistingFiles() {
return delegate.hasExistingFiles();
}

@Override
public boolean hasDeletedFiles() {
return delegate.hasDeletedFiles();
}

@Override
public ByteBuffer keyMetadata() {
return delegate.keyMetadata();
}

@Override
public Long firstRowId() {
return delegate.firstRowId();
}

@Override
public String path() {
return delegate.path();
}

@Override
public long length() {
return delegate.length();
}

@Override
public int partitionSpecId() {
return delegate.partitionSpecId();
}

@Override
public ManifestContent content() {
return delegate.content();
}

@Override
public long sequenceNumber() {
return delegate.sequenceNumber();
}

@Override
public long minSequenceNumber() {
return delegate.minSequenceNumber();
}

@Override
public Long snapshotId() {
return delegate.snapshotId();
}

@Override
public Integer addedFilesCount() {
return delegate.addedFilesCount();
}

@Override
public Long addedRowsCount() {
return delegate.addedRowsCount();
}

@Override
public Integer existingFilesCount() {
return delegate.existingFilesCount();
}

@Override
public Long existingRowsCount() {
return delegate.existingRowsCount();
}

@Override
public Integer deletedFilesCount() {
return delegate.deletedFilesCount();
}

@Override
public Long deletedRowsCount() {
return delegate.deletedRowsCount();
}

@Override
public List<PartitionFieldSummary> partitions() {
return delegate.partitions();
}

@Override
public ManifestFile copy() {
return delegate.copy();
}

@Override
public int hashCode() {
return delegate.hashCode();
}

@Override
public boolean equals(Object obj) {
return delegate.equals(obj);
}
}

public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> createTasksFunc) {
LoadingCache<Integer, ResidualEvaluator> residualCache =
Caffeine.newBuilder()
Expand All @@ -182,7 +353,7 @@ public <T extends ScanTask> CloseableIterable<T> plan(CreateTasksFunction<T> cre
return ResidualEvaluator.of(spec, filter, caseSensitive);
});

DeleteFileIndex deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build();
deleteFiles = deleteIndexBuilder.scanMetrics(scanMetrics).build();

boolean dropStats = ManifestReader.dropStats(columns);
if (deleteFiles.hasEqualityDeletes()) {
Expand Down Expand Up @@ -352,6 +523,9 @@ public void close() throws IOException {
if (iterable != null) {
iterable.close();
}
if (manifest instanceof CloseableManifest) {
((CloseableManifest) manifest).close();
}
}
});
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableScanContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public boolean returnColumnStats() {
return false;
}

@Value.Default
public boolean removeUnusedDeletesWhenPlanning() {
return false;
}

@Nullable
public abstract Set<Integer> columnsToKeepStats();

Expand Down Expand Up @@ -129,6 +134,13 @@ TableScanContext shouldReturnColumnStats(boolean returnColumnStats) {
.build();
}

TableScanContext shouldRemoveUnusedDeletesWhenPlanning(boolean removeUnusedDeletesWhenPlanning) {
return ImmutableTableScanContext.builder()
.from(this)
.removeUnusedDeletesWhenPlanning(removeUnusedDeletesWhenPlanning)
.build();
}

TableScanContext columnsToKeepStats(Set<Integer> columnsToKeepStats) {
Preconditions.checkState(
returnColumnStats(),
Expand Down