Skip to content

Partial update #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
@@ -65,6 +65,16 @@ acceptedBreaks:
- code: "java.method.removed"
old: "method void org.apache.iceberg.io.DataWriter<T>::add(T)"
justification: "Removing deprecated method"
"1.1.0":
org.apache.iceberg:iceberg-api:
- code: "java.method.addedToInterface"
new: "method java.util.List<java.lang.Integer> org.apache.iceberg.ContentFile<F>::partialFieldIds()"
justification: "{add new feature}"
org.apache.iceberg:iceberg-data:
- code: "java.method.abstractMethodAdded"
new: "method T org.apache.iceberg.data.DeleteFilter<T>::combineRecord(T, org.apache.iceberg.StructLike,\
\ org.apache.iceberg.Schema, org.apache.iceberg.Schema)"
justification: "{add new feature}"
apache-iceberg-0.14.0:
org.apache.iceberg:iceberg-api:
- code: "java.class.defaultSerializationChanged"
2 changes: 2 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
@@ -104,6 +104,8 @@ public interface ContentFile<F> {
*/
List<Integer> equalityFieldIds();

List<Integer> partialFieldIds();

/**
* Returns the sort order id of this file, which describes how the file is ordered. This
* information will be useful for merging data and equality delete files more efficiently when
17 changes: 15 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
@@ -102,7 +102,14 @@ public interface DataFile extends ContentFile<DataFile> {
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 142

Types.NestedField PARTIAL_IDS =
optional(
142,
"partial_ids",
ListType.ofRequired(143, IntegerType.get()),
"partial comparison field IDs");
// NEXT ID TO ASSIGN: 144

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
@@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
PARTIAL_IDS);
}

/** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */
@@ -136,4 +144,9 @@ default FileContent content() {
default List<Integer> equalityFieldIds() {
return null;
}

@Override
default List<Integer> partialFieldIds() {
return null;
}
}
3 changes: 2 additions & 1 deletion api/src/main/java/org/apache/iceberg/FileContent.java
Original file line number Diff line number Diff line change
@@ -22,7 +22,8 @@
public enum FileContent {
DATA(0),
POSITION_DELETES(1),
EQUALITY_DELETES(2);
EQUALITY_DELETES(2),
PARTIAL_UPDATE(3);

private final int id;

18 changes: 18 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
@@ -73,6 +73,7 @@ public PartitionData copy() {
private Map<Integer, ByteBuffer> upperBounds = null;
private long[] splitOffsets = null;
private int[] equalityIds = null;
private int[] partialIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;

@@ -132,6 +133,7 @@ public PartitionData copy() {
Map<Integer, ByteBuffer> upperBounds,
List<Long> splitOffsets,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
this.partitionSpecId = specId;
@@ -159,6 +161,7 @@ public PartitionData copy() {
this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
this.splitOffsets = ArrayUtil.toLongArray(splitOffsets);
this.equalityIds = equalityFieldIds;
this.partialIds = partialFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}
@@ -207,6 +210,10 @@ public PartitionData copy() {
toCopy.equalityIds != null
? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length)
: null;
this.partialIds =
toCopy.partialIds != null
? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length)
: null;
this.sortOrderId = toCopy.sortOrderId;
}

@@ -294,6 +301,9 @@ public void put(int i, Object value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.partialIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 18:
this.fileOrdinal = (long) value;
return;
default:
@@ -349,6 +359,8 @@ public Object get(int i) {
case 16:
return sortOrderId;
case 17:
return partialFieldIds();
case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -445,6 +457,11 @@ public List<Integer> equalityFieldIds() {
return ArrayUtil.toIntList(equalityIds);
}

@Override
public List<Integer> partialFieldIds() {
return ArrayUtil.toIntList(partialIds);
}

@Override
public Integer sortOrderId() {
return sortOrderId;
@@ -478,6 +495,7 @@ public String toString() {
.add("split_offsets", splitOffsets == null ? "null" : splitOffsets())
.add("equality_ids", equalityIds == null ? "null" : equalityFieldIds())
.add("sort_order_id", sortOrderId)
.add("partial_ids", equalityIds == null ? "null" : partialFieldIds())
.toString();
}
}
90 changes: 90 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
@@ -156,6 +156,80 @@ private static boolean canContainDeletesForFile(

case EQUALITY_DELETES:
return canContainEqDeletesForFile(dataFile, deleteFile, schema);

case PARTIAL_UPDATE:
return canContainPartialDeletesForFile(dataFile, deleteFile, schema);
}

return true;
}

// todo: add actual implementation
private static boolean canContainPartialDeletesForFile(
DataFile dataFile, DeleteFile deleteFile, Schema schema) {
// whether to check data ranges or to assume that the ranges match
// if upper/lower bounds are missing, null counts may still be used to determine delete files
// can be skipped
boolean checkRanges =
dataFile.lowerBounds() != null
&& dataFile.upperBounds() != null
&& deleteFile.lowerBounds() != null
&& deleteFile.upperBounds() != null;

Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();

Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
Map<Integer, Long> deleteNullCounts = deleteFile.nullValueCounts();
Map<Integer, Long> deleteValueCounts = deleteFile.valueCounts();

for (int id : deleteFile.equalityFieldIds()) {
Types.NestedField field = schema.findField(id);
if (!field.type().isPrimitiveType()) {
// stats are not kept for nested types. assume that the delete file may match
continue;
}

if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) {
// the data has null values and null has been deleted, so the deletes must be applied
continue;
}

if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) {
// the data file contains only null values for this field, but there are no deletes for null
// values
return false;
}

if (allNull(deleteNullCounts, deleteValueCounts, field)
&& allNonNull(dataNullCounts, field)) {
// the delete file removes only null rows with null for this field, but there are no data
// rows with null
return false;
}

if (!checkRanges) {
// some upper and lower bounds are missing, assume they match
continue;
}

ByteBuffer dataLower = dataLowers.get(id);
ByteBuffer dataUpper = dataUppers.get(id);
ByteBuffer deleteLower = deleteLowers.get(id);
ByteBuffer deleteUpper = deleteUppers.get(id);
if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) {
// at least one bound is not known, assume the delete file may match
continue;
}

if (!rangesOverlap(
field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) {
// no values overlap between the data file and the deletes
return false;
}
}

return true;
@@ -474,6 +548,22 @@ DeleteFileIndex build() {
globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

// fixme: this will overlap equal deletes
List<Pair<Long, DeleteFile>> partialDeleteSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE)
.map(
entry ->
// a delete file is indexed by the sequence number it should be applied to
Pair.of(entry.dataSequenceNumber(), entry.file()))
.sorted(Comparator.comparingLong(Pair::first))
.collect(Collectors.toList());
if (partialDeleteSortedBySeq.size() > 0) {
globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray();
globalDeletes =
partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
}

List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
.filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
11 changes: 11 additions & 0 deletions core/src/main/java/org/apache/iceberg/FileMetadata.java
Original file line number Diff line number Diff line change
@@ -41,6 +41,7 @@ public static class Builder {
private final int specId;
private FileContent content = null;
private int[] equalityFieldIds = null;
private int[] partialFieldIds = null;
private PartitionData partitionData;
private String filePath = null;
private FileFormat format = null;
@@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) {
return this;
}

public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) {
this.content = FileContent.PARTIAL_UPDATE;
this.equalityFieldIds = newEqualityFieldIds;
this.partialFieldIds = newPartialFieldIds;
return this;
}

public Builder withStatus(FileStatus stat) {
this.filePath = stat.getPath().toString();
this.fileSizeInBytes = stat.getLen();
@@ -222,6 +230,8 @@ public DeleteFile build() {
sortOrderId == null, "Position delete file should not have sort order");
break;
case EQUALITY_DELETES:

case PARTIAL_UPDATE:
if (sortOrderId == null) {
sortOrderId = SortOrder.unsorted().orderId();
}
@@ -246,6 +256,7 @@ public DeleteFile build() {
lowerBounds,
upperBounds),
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/GenericDataFile.java
Original file line number Diff line number Diff line change
@@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile<DataFile> implements DataFile {
metrics.upperBounds(),
splitOffsets,
null,
null,
sortOrderId,
keyMetadata);
}
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
long fileSizeInBytes,
Metrics metrics,
int[] equalityFieldIds,
int[] partialFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
super(
@@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
metrics.upperBounds(),
null,
equalityFieldIds,
partialFieldIds,
sortOrderId,
keyMetadata);
}
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/SnapshotSummary.java
Original file line number Diff line number Diff line change
@@ -221,12 +221,14 @@ private static class UpdateMetrics {
private int addedPosDeleteFiles = 0;
private int removedPosDeleteFiles = 0;
private int addedDeleteFiles = 0;
private int addedPartialFiles = 0;
private int removedDeleteFiles = 0;
private long addedRecords = 0L;
private long deletedRecords = 0L;
private long addedPosDeletes = 0L;
private long removedPosDeletes = 0L;
private long addedEqDeletes = 0L;
private long addedPartialUpdates = 0L;
private long removedEqDeletes = 0L;
private boolean trustSizeAndDeleteCounts = true;

@@ -290,6 +292,12 @@ void addedFile(ContentFile<?> file) {
this.addedEqDeleteFiles += 1;
this.addedEqDeletes += file.recordCount();
break;
case PARTIAL_UPDATE:
this.addedDeleteFiles += 1;
this.addedPartialFiles += 1;
this.addedPartialUpdates += file.recordCount();
break;

default:
throw new UnsupportedOperationException(
"Unsupported file content type: " + file.content());
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
@@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID);
DataFile.SORT_ORDER_ID,
DataFile.PARTIAL_IDS);
}

static class IndexedManifestEntry<F extends ContentFile<F>>
@@ -456,6 +457,8 @@ public Object get(int pos) {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
case 16:
return wrapped.partialFieldIds();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
@@ -550,6 +553,11 @@ public List<Integer> equalityFieldIds() {
return wrapped.equalityFieldIds();
}

@Override
public List<Integer> partialFieldIds() {
return wrapped.partialFieldIds();
}

@Override
public Integer sortOrderId() {
return wrapped.sortOrderId();
Loading