From f304d102874343cfc0ce194e15055ae6bcb0a6d8 Mon Sep 17 00:00:00 2001 From: kunni Date: Fri, 16 Dec 2022 17:56:02 +0800 Subject: [PATCH] partial update --- .palantir/revapi.yml | 10 + .../java/org/apache/iceberg/ContentFile.java | 2 + .../java/org/apache/iceberg/DataFile.java | 17 +- .../java/org/apache/iceberg/FileContent.java | 3 +- .../java/org/apache/iceberg/BaseFile.java | 18 ++ .../org/apache/iceberg/DeleteFileIndex.java | 90 ++++++++ .../java/org/apache/iceberg/FileMetadata.java | 11 + .../org/apache/iceberg/GenericDataFile.java | 1 + .../org/apache/iceberg/GenericDeleteFile.java | 2 + .../org/apache/iceberg/SnapshotSummary.java | 8 + .../java/org/apache/iceberg/V2Metadata.java | 10 +- .../java/org/apache/iceberg/avro/Avro.java | 52 +++++ .../org/apache/iceberg/deletes/Deletes.java | 16 ++ .../iceberg/deletes/PartialDeleteWriter.java | 105 +++++++++ .../org/apache/iceberg/io/BaseTaskWriter.java | 41 ++++ .../iceberg/io/FileAppenderFactory.java | 14 ++ .../apache/iceberg/TestManifestReader.java | 4 +- .../iceberg/TestManifestWriterVersions.java | 2 + .../org/apache/iceberg/data/DeleteFilter.java | 190 +++++++++++++++- .../iceberg/data/GenericAppenderFactory.java | 67 ++++++ .../iceberg/data/GenericDeleteFilter.java | 18 ++ .../org/apache/iceberg/data/FileHelpers.java | 40 ++++ .../apache/iceberg/data/PartialReadTests.java | 193 +++++++++++++++++ deploy.gradle | 15 +- .../source/RowDataFileScanTaskReader.java | 6 + .../source/RowDataFileScanTaskReader.java | 6 + .../mr/TestInputFormatReaderDeletes2.java | 112 ++++++++++ .../main/java/org/apache/iceberg/orc/ORC.java | 58 +++++ .../org/apache/iceberg/parquet/Parquet.java | 57 +++++ .../iceberg/spark/source/RowDataReader.java | 6 + .../iceberg/spark/source/RowDataReader.java | 203 ++++++++++++++++++ .../iceberg/spark/source/BatchDataReader.java | 6 + .../iceberg/spark/source/RowDataReader.java | 6 + .../iceberg/spark/source/BaseReader.java | 6 + .../iceberg/spark/source/BaseReader.java | 6 + 35 files changed, 1371 insertions(+), 30 deletions(-) create mode 100644 core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java create mode 100644 data/src/test/java/org/apache/iceberg/data/PartialReadTests.java create mode 100644 mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java create mode 100644 spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index a4a7559b8e71..c8a18bcf698a 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -65,6 +65,16 @@ acceptedBreaks: - code: "java.method.removed" old: "method void org.apache.iceberg.io.DataWriter::add(T)" justification: "Removing deprecated method" + "1.1.0": + org.apache.iceberg:iceberg-api: + - code: "java.method.addedToInterface" + new: "method java.util.List org.apache.iceberg.ContentFile::partialFieldIds()" + justification: "{add new feature}" + org.apache.iceberg:iceberg-data: + - code: "java.method.abstractMethodAdded" + new: "method T org.apache.iceberg.data.DeleteFilter::combineRecord(T, org.apache.iceberg.StructLike,\ + \ org.apache.iceberg.Schema, org.apache.iceberg.Schema)" + justification: "{add new feature}" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java index d214ee6eb5ba..56055178846c 100644 --- a/api/src/main/java/org/apache/iceberg/ContentFile.java +++ b/api/src/main/java/org/apache/iceberg/ContentFile.java @@ -104,6 +104,8 @@ public interface ContentFile { */ List equalityFieldIds(); + List partialFieldIds(); + /** * Returns the sort order id of this file, which describes how the file is ordered. This * information will be useful for merging data and equality delete files more efficiently when diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 59b329c500c7..4a4aade91379 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -102,7 +102,14 @@ public interface DataFile extends ContentFile { int PARTITION_ID = 102; String PARTITION_NAME = "partition"; String PARTITION_DOC = "Partition data tuple, schema based on the partition spec"; - // NEXT ID TO ASSIGN: 142 + + Types.NestedField PARTIAL_IDS = + optional( + 142, + "partial_ids", + ListType.ofRequired(143, IntegerType.get()), + "partial comparison field IDs"); + // NEXT ID TO ASSIGN: 144 static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry @@ -123,7 +130,8 @@ static StructType getType(StructType partitionType) { KEY_METADATA, SPLIT_OFFSETS, EQUALITY_IDS, - SORT_ORDER_ID); + SORT_ORDER_ID, + PARTIAL_IDS); } /** @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES */ @@ -136,4 +144,9 @@ default FileContent content() { default List equalityFieldIds() { return null; } + + @Override + default List partialFieldIds() { + return null; + } } diff --git a/api/src/main/java/org/apache/iceberg/FileContent.java b/api/src/main/java/org/apache/iceberg/FileContent.java index 2c9a2fa51bd2..6cb1e2a5182e 100644 --- a/api/src/main/java/org/apache/iceberg/FileContent.java +++ b/api/src/main/java/org/apache/iceberg/FileContent.java @@ -22,7 +22,8 @@ public enum FileContent { DATA(0), POSITION_DELETES(1), - EQUALITY_DELETES(2); + EQUALITY_DELETES(2), + PARTIAL_UPDATE(3); private final int id; diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java index e1a10124138b..af8037817114 100644 --- a/core/src/main/java/org/apache/iceberg/BaseFile.java +++ b/core/src/main/java/org/apache/iceberg/BaseFile.java @@ -73,6 +73,7 @@ public PartitionData copy() { private Map upperBounds = null; private long[] splitOffsets = null; private int[] equalityIds = null; + private int[] partialIds = null; private byte[] keyMetadata = null; private Integer sortOrderId; @@ -132,6 +133,7 @@ public PartitionData copy() { Map upperBounds, List splitOffsets, int[] equalityFieldIds, + int[] partialFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { this.partitionSpecId = specId; @@ -159,6 +161,7 @@ public PartitionData copy() { this.upperBounds = SerializableByteBufferMap.wrap(upperBounds); this.splitOffsets = ArrayUtil.toLongArray(splitOffsets); this.equalityIds = equalityFieldIds; + this.partialIds = partialFieldIds; this.sortOrderId = sortOrderId; this.keyMetadata = ByteBuffers.toByteArray(keyMetadata); } @@ -207,6 +210,10 @@ public PartitionData copy() { toCopy.equalityIds != null ? Arrays.copyOf(toCopy.equalityIds, toCopy.equalityIds.length) : null; + this.partialIds = + toCopy.partialIds != null + ? Arrays.copyOf(toCopy.partialIds, toCopy.partialIds.length) + : null; this.sortOrderId = toCopy.sortOrderId; } @@ -294,6 +301,9 @@ public void put(int i, Object value) { this.sortOrderId = (Integer) value; return; case 17: + this.partialIds = ArrayUtil.toIntArray((List) value); + return; + case 18: this.fileOrdinal = (long) value; return; default: @@ -349,6 +359,8 @@ public Object get(int i) { case 16: return sortOrderId; case 17: + return partialFieldIds(); + case 18: return fileOrdinal; default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); @@ -445,6 +457,11 @@ public List equalityFieldIds() { return ArrayUtil.toIntList(equalityIds); } + @Override + public List partialFieldIds() { + return ArrayUtil.toIntList(partialIds); + } + @Override public Integer sortOrderId() { return sortOrderId; @@ -478,6 +495,7 @@ public String toString() { .add("split_offsets", splitOffsets == null ? "null" : splitOffsets()) .add("equality_ids", equalityIds == null ? "null" : equalityFieldIds()) .add("sort_order_id", sortOrderId) + .add("partial_ids", equalityIds == null ? "null" : partialFieldIds()) .toString(); } } diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java index eedde21397eb..970cb4c88289 100644 --- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java +++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java @@ -156,6 +156,80 @@ private static boolean canContainDeletesForFile( case EQUALITY_DELETES: return canContainEqDeletesForFile(dataFile, deleteFile, schema); + + case PARTIAL_UPDATE: + return canContainPartialDeletesForFile(dataFile, deleteFile, schema); + } + + return true; + } + + // todo: add actual implementation + private static boolean canContainPartialDeletesForFile( + DataFile dataFile, DeleteFile deleteFile, Schema schema) { + // whether to check data ranges or to assume that the ranges match + // if upper/lower bounds are missing, null counts may still be used to determine delete files + // can be skipped + boolean checkRanges = + dataFile.lowerBounds() != null + && dataFile.upperBounds() != null + && deleteFile.lowerBounds() != null + && deleteFile.upperBounds() != null; + + Map dataLowers = dataFile.lowerBounds(); + Map dataUppers = dataFile.upperBounds(); + Map deleteLowers = deleteFile.lowerBounds(); + Map deleteUppers = deleteFile.upperBounds(); + + Map dataNullCounts = dataFile.nullValueCounts(); + Map dataValueCounts = dataFile.valueCounts(); + Map deleteNullCounts = deleteFile.nullValueCounts(); + Map deleteValueCounts = deleteFile.valueCounts(); + + for (int id : deleteFile.equalityFieldIds()) { + Types.NestedField field = schema.findField(id); + if (!field.type().isPrimitiveType()) { + // stats are not kept for nested types. assume that the delete file may match + continue; + } + + if (containsNull(dataNullCounts, field) && containsNull(deleteNullCounts, field)) { + // the data has null values and null has been deleted, so the deletes must be applied + continue; + } + + if (allNull(dataNullCounts, dataValueCounts, field) && allNonNull(deleteNullCounts, field)) { + // the data file contains only null values for this field, but there are no deletes for null + // values + return false; + } + + if (allNull(deleteNullCounts, deleteValueCounts, field) + && allNonNull(dataNullCounts, field)) { + // the delete file removes only null rows with null for this field, but there are no data + // rows with null + return false; + } + + if (!checkRanges) { + // some upper and lower bounds are missing, assume they match + continue; + } + + ByteBuffer dataLower = dataLowers.get(id); + ByteBuffer dataUpper = dataUppers.get(id); + ByteBuffer deleteLower = deleteLowers.get(id); + ByteBuffer deleteUpper = deleteUppers.get(id); + if (dataLower == null || dataUpper == null || deleteLower == null || deleteUpper == null) { + // at least one bound is not known, assume the delete file may match + continue; + } + + if (!rangesOverlap( + field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, deleteUpper)) { + // no values overlap between the data file and the deletes + return false; + } } return true; @@ -474,6 +548,22 @@ DeleteFileIndex build() { globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray(); globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); + // fixme: this will overlap equal deletes + List> partialDeleteSortedBySeq = + deleteFilesByPartition.get(partition).stream() + .filter(entry -> entry.file().content() == FileContent.PARTIAL_UPDATE) + .map( + entry -> + // a delete file is indexed by the sequence number it should be applied to + Pair.of(entry.dataSequenceNumber(), entry.file())) + .sorted(Comparator.comparingLong(Pair::first)) + .collect(Collectors.toList()); + if (partialDeleteSortedBySeq.size() > 0) { + globalApplySeqs = partialDeleteSortedBySeq.stream().mapToLong(Pair::first).toArray(); + globalDeletes = + partialDeleteSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); + } + List> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream() .filter(entry -> entry.file().content() == FileContent.POSITION_DELETES) diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java b/core/src/main/java/org/apache/iceberg/FileMetadata.java index 7d025fa8a630..f8170791f5c0 100644 --- a/core/src/main/java/org/apache/iceberg/FileMetadata.java +++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java @@ -41,6 +41,7 @@ public static class Builder { private final int specId; private FileContent content = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private PartitionData partitionData; private String filePath = null; private FileFormat format = null; @@ -116,6 +117,13 @@ public Builder ofEqualityDeletes(int... fieldIds) { return this; } + public Builder ofPartialDeletes(int[] newEqualityFieldIds, int[] newPartialFieldIds) { + this.content = FileContent.PARTIAL_UPDATE; + this.equalityFieldIds = newEqualityFieldIds; + this.partialFieldIds = newPartialFieldIds; + return this; + } + public Builder withStatus(FileStatus stat) { this.filePath = stat.getPath().toString(); this.fileSizeInBytes = stat.getLen(); @@ -222,6 +230,8 @@ public DeleteFile build() { sortOrderId == null, "Position delete file should not have sort order"); break; case EQUALITY_DELETES: + + case PARTIAL_UPDATE: if (sortOrderId == null) { sortOrderId = SortOrder.unsorted().orderId(); } @@ -246,6 +256,7 @@ public DeleteFile build() { lowerBounds, upperBounds), equalityFieldIds, + partialFieldIds, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 34c65e669fb2..5d39535fc4ae 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -57,6 +57,7 @@ class GenericDataFile extends BaseFile implements DataFile { metrics.upperBounds(), splitOffsets, null, + null, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java index 1b4effba642c..ce6b7aa440a9 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java @@ -39,6 +39,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { long fileSizeInBytes, Metrics metrics, int[] equalityFieldIds, + int[] partialFieldIds, Integer sortOrderId, ByteBuffer keyMetadata) { super( @@ -57,6 +58,7 @@ class GenericDeleteFile extends BaseFile implements DeleteFile { metrics.upperBounds(), null, equalityFieldIds, + partialFieldIds, sortOrderId, keyMetadata); } diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 460e67430b2f..6f1e317cb982 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -221,12 +221,14 @@ private static class UpdateMetrics { private int addedPosDeleteFiles = 0; private int removedPosDeleteFiles = 0; private int addedDeleteFiles = 0; + private int addedPartialFiles = 0; private int removedDeleteFiles = 0; private long addedRecords = 0L; private long deletedRecords = 0L; private long addedPosDeletes = 0L; private long removedPosDeletes = 0L; private long addedEqDeletes = 0L; + private long addedPartialUpdates = 0L; private long removedEqDeletes = 0L; private boolean trustSizeAndDeleteCounts = true; @@ -290,6 +292,12 @@ void addedFile(ContentFile file) { this.addedEqDeleteFiles += 1; this.addedEqDeletes += file.recordCount(); break; + case PARTIAL_UPDATE: + this.addedDeleteFiles += 1; + this.addedPartialFiles += 1; + this.addedPartialUpdates += file.recordCount(); + break; + default: throw new UnsupportedOperationException( "Unsupported file content type: " + file.content()); diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java index 64ab0fe94bd1..2d5fe73ac3c5 100644 --- a/core/src/main/java/org/apache/iceberg/V2Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java @@ -272,7 +272,8 @@ static Types.StructType fileType(Types.StructType partitionType) { DataFile.KEY_METADATA, DataFile.SPLIT_OFFSETS, DataFile.EQUALITY_IDS, - DataFile.SORT_ORDER_ID); + DataFile.SORT_ORDER_ID, + DataFile.PARTIAL_IDS); } static class IndexedManifestEntry> @@ -456,6 +457,8 @@ public Object get(int pos) { return wrapped.equalityFieldIds(); case 15: return wrapped.sortOrderId(); + case 16: + return wrapped.partialFieldIds(); } throw new IllegalArgumentException("Unknown field ordinal: " + pos); } @@ -550,6 +553,11 @@ public List equalityFieldIds() { return wrapped.equalityFieldIds(); } + @Override + public List partialFieldIds() { + return wrapped.partialFieldIds(); + } + @Override public Integer sortOrderId() { return wrapped.sortOrderId(); diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 85cc8d902026..ec3c38dcfaff 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -53,6 +53,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; @@ -377,6 +378,7 @@ public static class DeleteWriteBuilder { private StructLike partition; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private SortOrder sortOrder; private DeleteWriteBuilder(OutputFile file) { @@ -461,6 +463,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { this.sortOrder = newSortOrder; return this; @@ -503,6 +510,51 @@ public EqualityDeleteWriter buildEqualityWriter() throws IOException { equalityFieldIds); } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.AVRO, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public PositionDeleteWriter buildPositionWriter() throws IOException { Preconditions.checkState( equalityFieldIds == null, "Cannot create position delete file using delete field ids"); diff --git a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java index 7fc118d17a03..41288fcc5df7 100644 --- a/core/src/main/java/org/apache/iceberg/deletes/Deletes.java +++ b/core/src/main/java/org/apache/iceberg/deletes/Deletes.java @@ -38,7 +38,9 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Filter; import org.apache.iceberg.util.SortedMerge; +import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.StructProjection; public class Deletes { private static final Schema POSITION_DELETE_SCHEMA = @@ -124,6 +126,20 @@ protected boolean shouldKeep(T item) { return remainingRowsFilter.filter(rows); } + public static StructLikeMap toPartialMap( + CloseableIterable partials, + Types.StructType eqType, + StructProjection projectRow) { + try (CloseableIterable partial = partials) { + StructLikeMap objectStructLikeMap = StructLikeMap.create(eqType); + partial.forEach(delete -> objectStructLikeMap.put(projectRow.copyFor(delete), delete)); + + return objectStructLikeMap; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close partial delete source", e); + } + } + public static StructLikeSet toEqualitySet( CloseableIterable eqDeletes, Types.StructType eqType) { try (CloseableIterable deletes = eqDeletes) { diff --git a/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java b/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java new file mode 100644 index 000000000000..fa3e39a8f1b7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/deletes/PartialDeleteWriter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.deletes; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.encryption.EncryptionKeyMetadata; +import org.apache.iceberg.io.DeleteWriteResult; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class PartialDeleteWriter implements FileWriter { + private final FileAppender appender; + private final FileFormat format; + private final String location; + private final PartitionSpec spec; + private final StructLike partition; + private final ByteBuffer keyMetadata; + private final int[] equalityFieldIds; + private final int[] partialFieldIds; + private final SortOrder sortOrder; + private DeleteFile deleteFile = null; + + public PartialDeleteWriter( + FileAppender appender, + FileFormat format, + String location, + PartitionSpec spec, + StructLike partition, + EncryptionKeyMetadata keyMetadata, + SortOrder sortOrder, + int[] equalityFieldIds, + int[] partialFieldIds) { + this.appender = appender; + this.format = format; + this.location = location; + this.spec = spec; + this.partition = partition; + this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null; + this.sortOrder = sortOrder; + this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + } + + @Override + public void write(T row) { + appender.add(row); + } + + @Override + public long length() { + return appender.length(); + } + + @Override + public void close() throws IOException { + if (deleteFile == null) { + appender.close(); + this.deleteFile = + FileMetadata.deleteFileBuilder(spec) + .ofPartialDeletes(equalityFieldIds, partialFieldIds) + .withFormat(format) + .withPath(location) + .withPartition(partition) + .withEncryptionKeyMetadata(keyMetadata) + .withFileSizeInBytes(appender.length()) + .withMetrics(appender.metrics()) + .withSortOrder(sortOrder) + .build(); + } + } + + public DeleteFile toDeleteFile() { + Preconditions.checkState(deleteFile != null, "Cannot create delete file from unclosed writer"); + return deleteFile; + } + + @Override + public DeleteWriteResult result() { + return new DeleteWriteResult(toDeleteFile()); + } +} diff --git a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java index bba2c5355a98..e3c1c2223b88 100644 --- a/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java +++ b/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java @@ -30,6 +30,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -111,6 +112,7 @@ protected abstract class BaseEqualityDeltaWriter implements Closeable { private RollingEqDeleteWriter eqDeleteWriter; private SortedPosDeleteWriter posDeleteWriter; private Map insertedRowMap; + private RollingPartialDeleteWriter partialDeleteWriter; protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema deleteSchema) { Preconditions.checkNotNull(schema, "Iceberg table schema cannot be null."); @@ -121,6 +123,7 @@ protected BaseEqualityDeltaWriter(StructLike partition, Schema schema, Schema de this.eqDeleteWriter = new RollingEqDeleteWriter(partition); this.posDeleteWriter = new SortedPosDeleteWriter<>(appenderFactory, fileFactory, format, partition); + this.partialDeleteWriter = new RollingPartialDeleteWriter(partition); this.insertedRowMap = StructLikeMap.create(deleteSchema.asStruct()); } @@ -146,6 +149,10 @@ public void write(T row) throws IOException { dataWriter.write(row); } + public void update(T row) throws IOException { + partialDeleteWriter.write(row); + } + /** * Write the pos-delete if there's an existing row matching the given key. * @@ -208,6 +215,14 @@ public void close() throws IOException { } } + if (partialDeleteWriter != null) { + try { + partialDeleteWriter.close(); + } finally { + partialDeleteWriter = null; + } + } + if (insertedRowMap != null) { insertedRowMap.clear(); insertedRowMap = null; @@ -393,4 +408,30 @@ void complete(EqualityDeleteWriter closedWriter) { completedDeleteFiles.add(closedWriter.toDeleteFile()); } } + + protected class RollingPartialDeleteWriter extends BaseRollingWriter> { + RollingPartialDeleteWriter(StructLike partitionKey) { + super(partitionKey); + } + + @Override + PartialDeleteWriter newWriter(EncryptedOutputFile file, StructLike partitionKey) { + return appenderFactory.newPartialWriter(file, format, partitionKey); + } + + @Override + long length(PartialDeleteWriter writer) { + return writer.length(); + } + + @Override + void write(PartialDeleteWriter writer, T record) { + writer.write(record); + } + + @Override + void complete(PartialDeleteWriter closedWriter) { + completedDeleteFiles.add(closedWriter.toDeleteFile()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java index 59b0b4b3bf6a..3b51843635d4 100644 --- a/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/FileAppenderFactory.java @@ -21,6 +21,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -62,6 +63,19 @@ DataWriter newDataWriter( EqualityDeleteWriter newEqDeleteWriter( EncryptedOutputFile outputFile, FileFormat format, StructLike partition); + /** + * Create a new {@link PartialDeleteWriter}. + * + * @param outputFile an OutputFile used to create an output stream. + * @param format a file format + * @param partition a tuple of partition values + * @return a newly created {@link PartialDeleteWriter} for partial updates. + */ + default PartialDeleteWriter newPartialWriter( + EncryptedOutputFile outputFile, FileFormat format, StructLike partition) { + throw new UnsupportedOperationException("Not implemented yet."); + } + /** * Create a new {@link PositionDeleteWriter}. * diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index dfc84200fdb2..c45ec15e6cac 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -131,7 +131,7 @@ public void testDataFilePositions() throws IOException { for (DataFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } @@ -148,7 +148,7 @@ public void testDeleteFilePositions() throws IOException { for (DeleteFile file : reader) { Assert.assertEquals("Position should match", (Long) expectedPos, file.pos()); Assert.assertEquals( - "Position from field index should match", expectedPos, ((BaseFile) file).get(17)); + "Position from field index should match", expectedPos, ((BaseFile) file).get(18)); expectedPos += 1; } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java index 8f9cff01967b..f59fa4d67144 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java @@ -81,6 +81,7 @@ public class TestManifestWriterVersions { private static final List EQUALITY_IDS = ImmutableList.of(1); private static final int[] EQUALITY_ID_ARR = new int[] {1}; + private static final int[] PARTIAL_ID_ARR = new int[] {1}; private static final DeleteFile DELETE_FILE = new GenericDeleteFile( @@ -92,6 +93,7 @@ public class TestManifestWriterVersions { 22905L, METRICS, EQUALITY_ID_ARR, + PARTIAL_ID_ARR, SORT_ORDER_ID, null); diff --git a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java index a7979fd2ed3e..bad71a3ba5da 100644 --- a/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/DeleteFilter.java @@ -41,6 +41,7 @@ import org.apache.iceberg.io.InputFile; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Function; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -51,6 +52,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.StructLikeMap; import org.apache.iceberg.util.StructLikeSet; import org.apache.iceberg.util.StructProjection; import org.slf4j.Logger; @@ -66,7 +69,8 @@ public abstract class DeleteFilter { private final String filePath; private final List posDeletes; private final List eqDeletes; - private final Schema requiredSchema; + private final List partialDeletes; + private final Schema requiredAllSchema; private final Accessor posAccessor; private final boolean hasIsDeletedColumn; private final int isDeletedColumnPosition; @@ -74,8 +78,62 @@ public abstract class DeleteFilter { private PositionDeleteIndex deleteRowPositions = null; private List> isInDeleteSets = null; + private Set> partialDataSet = null; private Predicate eqDeleteRows = null; + protected DeleteFilter( + String filePath, + List deletes, + Schema tableSchema, + Schema requestedSchema, + DeleteCounter counter, + List partialIds) { + this.setFilterThreshold = DEFAULT_SET_FILTER_THRESHOLD; + this.filePath = filePath; + this.counter = counter; + + ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); + ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); + ImmutableList.Builder partialBuilder = ImmutableList.builder(); + for (DeleteFile delete : deletes) { + switch (delete.content()) { + case POSITION_DELETES: + LOG.debug("Adding position delete file {} to filter", delete.path()); + posDeleteBuilder.add(delete); + break; + case EQUALITY_DELETES: + LOG.debug("Adding equality delete file {} to filter", delete.path()); + eqDeleteBuilder.add(delete); + break; + case PARTIAL_UPDATE: + boolean flag = false; + for (int id : partialIds) { + if (delete.partialFieldIds().contains(id)) { + flag = true; + } + } + if (flag) { + LOG.debug("Adding partial file {} to filter", delete.path()); + partialBuilder.add(delete); + } + break; + default: + throw new UnsupportedOperationException( + "Unknown delete file content: " + delete.content()); + } + } + + this.posDeletes = posDeleteBuilder.build(); + this.eqDeletes = eqDeleteBuilder.build(); + this.partialDeletes = partialBuilder.build(); + this.requiredAllSchema = + fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, partialDeletes); + this.posAccessor = requiredAllSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); + this.hasIsDeletedColumn = + requiredAllSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; + this.isDeletedColumnPosition = requiredAllSchema.columns().indexOf(MetadataColumns.IS_DELETED); + } + protected DeleteFilter( String filePath, List deletes, @@ -88,6 +146,7 @@ protected DeleteFilter( ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); + ImmutableList.Builder partialBuilder = ImmutableList.builder(); for (DeleteFile delete : deletes) { switch (delete.content()) { case POSITION_DELETES: @@ -98,6 +157,10 @@ protected DeleteFilter( LOG.debug("Adding equality delete file {} to filter", delete.path()); eqDeleteBuilder.add(delete); break; + case PARTIAL_UPDATE: + LOG.debug("Adding partial file {} to filter", delete.path()); + partialBuilder.add(delete); + break; default: throw new UnsupportedOperationException( "Unknown delete file content: " + delete.content()); @@ -106,11 +169,13 @@ protected DeleteFilter( this.posDeletes = posDeleteBuilder.build(); this.eqDeletes = eqDeleteBuilder.build(); - this.requiredSchema = fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes); - this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); + this.partialDeletes = partialBuilder.build(); + this.requiredAllSchema = + fileProjection(tableSchema, requestedSchema, posDeletes, eqDeletes, partialDeletes); + this.posAccessor = requiredAllSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); this.hasIsDeletedColumn = - requiredSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; - this.isDeletedColumnPosition = requiredSchema.columns().indexOf(MetadataColumns.IS_DELETED); + requiredAllSchema.findField(MetadataColumns.IS_DELETED.fieldId()) != null; + this.isDeletedColumnPosition = requiredAllSchema.columns().indexOf(MetadataColumns.IS_DELETED); } protected DeleteFilter( @@ -118,12 +183,21 @@ protected DeleteFilter( this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter()); } + protected DeleteFilter( + String filePath, + List deletes, + Schema tableSchema, + Schema requestedSchema, + List partialIds) { + this(filePath, deletes, tableSchema, requestedSchema, new DeleteCounter(), partialIds); + } + protected int columnIsDeletedPosition() { return isDeletedColumnPosition; } public Schema requiredSchema() { - return requiredSchema; + return requiredAllSchema; } public boolean hasPosDeletes() { @@ -134,6 +208,10 @@ public boolean hasEqDeletes() { return !eqDeletes.isEmpty(); } + public boolean hasPartialDeletes() { + return !partialDeletes.isEmpty(); + } + public void incrementDeleteCount() { counter.increment(); } @@ -144,6 +222,9 @@ Accessor posAccessor() { protected abstract StructLike asStructLike(T record); + protected abstract T combineRecord( + T record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema); + protected abstract InputFile getInputFile(String location); protected long pos(T record) { @@ -151,7 +232,7 @@ protected long pos(T record) { } public CloseableIterable filter(CloseableIterable records) { - return applyEqDeletes(applyPosDeletes(records)); + return applyPartialDeletes(applyEqDeletes(applyPosDeletes(records))); } private List> applyEqDeletes() { @@ -175,11 +256,11 @@ private List> applyEqDeletes() { Set ids = entry.getKey(); Iterable deletes = entry.getValue(); - Schema deleteSchema = TypeUtil.select(requiredSchema, ids); + Schema deleteSchema = TypeUtil.select(requiredAllSchema, ids); InternalRecordWrapper wrapper = new InternalRecordWrapper(deleteSchema.asStruct()); // a projection to select and reorder fields of the file schema to match the delete rows - StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); + StructProjection projectRow = StructProjection.create(requiredAllSchema, deleteSchema); Iterable> deleteRecords = Iterables.transform(deletes, delete -> openDeletes(delete, deleteSchema)); @@ -200,6 +281,71 @@ record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); return isInDeleteSets; } + public Set> applyPartialMap() { + if (partialDataSet != null) { + return partialDataSet; + } + + partialDataSet = Sets.newHashSet(); + if (partialDeletes.isEmpty()) { + return partialDataSet; + } + + Multimap, Set>, DeleteFile> filesByPartialIds = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); + for (DeleteFile delete : partialDeletes) { + Set eqIds = Sets.newHashSet(delete.equalityFieldIds()); + Set partialIds = Sets.newHashSet(delete.partialFieldIds()); + filesByPartialIds.put(Pair.of(eqIds, partialIds), delete); + } + + for (Map.Entry, Set>, Collection> entry : + filesByPartialIds.asMap().entrySet()) { + Pair, Set> pairIds = entry.getKey(); + Iterable deletes = entry.getValue(); + Set eqIds = pairIds.first(); + Set partialIds = pairIds.second(); + Set partialUpdateIds = Sets.newHashSet(eqIds); + partialUpdateIds.addAll(partialIds); + + Schema partialUpdateSchema = TypeUtil.select(requiredAllSchema, partialUpdateIds); + Schema eqSchema = TypeUtil.select(requiredAllSchema, eqIds); + Schema partialSchema = TypeUtil.select(requiredAllSchema, partialIds); + + InternalRecordWrapper wrapper = new InternalRecordWrapper(partialUpdateSchema.asStruct()); + StructProjection projectRow = StructProjection.create(partialUpdateSchema, eqSchema); + + Iterable> deleteRecords = + Iterables.transform(deletes, delete -> openDeletes(delete, partialUpdateSchema)); + + CloseableIterable records = + CloseableIterable.transform(CloseableIterable.concat(deleteRecords), Record::copy); + + StructLikeMap deleteSet = + Deletes.toPartialMap( + CloseableIterable.transform(records, wrapper::copyFor), + eqSchema.asStruct(), + projectRow); + + StructProjection eqProjectFromRequired = StructProjection.create(requiredAllSchema, eqSchema); + StructProjection partialProject = StructProjection.create(partialUpdateSchema, partialSchema); + + partialDataSet.add( + record -> { + StructProjection wrap = eqProjectFromRequired.wrap(asStructLike(record)); + StructLike structLike = deleteSet.get(wrap); + + if (structLike != null) { + StructLike partialRecord = partialProject.wrap(structLike); + return combineRecord(record, partialRecord, partialSchema, requiredAllSchema); + } + return null; + }); + } + + return partialDataSet; + } + public CloseableIterable findEqualityDeleteRows(CloseableIterable records) { // Predicate to test whether a row has been deleted by equality deletions. Predicate deletedRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); @@ -213,6 +359,23 @@ private CloseableIterable applyEqDeletes(CloseableIterable records) { return createDeleteIterable(records, isEqDeleted); } + public CloseableIterable applyPartialDeletes(CloseableIterable records) { + Set> partialCombines = applyPartialMap(); + + return CloseableIterable.transform( + records, + record -> { + T result = record; + for (Function partialCombine : partialCombines) { + T temp = partialCombine.apply(record); + if (temp != null) { + result = temp; + } + } + return result; + }); + } + protected void markRowDeleted(T item) { throw new UnsupportedOperationException( this.getClass().getName() + " does not implement markRowDeleted"); @@ -320,8 +483,9 @@ private static Schema fileProjection( Schema tableSchema, Schema requestedSchema, List posDeletes, - List eqDeletes) { - if (posDeletes.isEmpty() && eqDeletes.isEmpty()) { + List eqDeletes, + List partialDeletes) { + if (posDeletes.isEmpty() && eqDeletes.isEmpty() && partialDeletes.isEmpty()) { return requestedSchema; } @@ -334,6 +498,10 @@ private static Schema fileProjection( requiredIds.addAll(eqDelete.equalityFieldIds()); } + for (DeleteFile partial : partialDeletes) { + requiredIds.addAll(partial.equalityFieldIds()); + } + Set missingIds = Sets.newLinkedHashSet( Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema))); diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index 23a94ebc9944..c2900cc4ac9d 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -31,6 +31,7 @@ import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.FileAppender; @@ -47,8 +48,12 @@ public class GenericAppenderFactory implements FileAppenderFactory { private final Schema schema; private final PartitionSpec spec; private final int[] equalityFieldIds; + private final int[] partialFieldIds; + private final int[] fullFieldIds; private final Schema eqDeleteRowSchema; private final Schema posDeleteRowSchema; + private final Schema partialRowSchema; + private final Schema fullRowSchema; private final Map config = Maps.newHashMap(); public GenericAppenderFactory(Schema schema) { @@ -65,11 +70,37 @@ public GenericAppenderFactory( int[] equalityFieldIds, Schema eqDeleteRowSchema, Schema posDeleteRowSchema) { + this( + schema, + spec, + equalityFieldIds, + null, + null, + eqDeleteRowSchema, + posDeleteRowSchema, + null, + null); + } + + public GenericAppenderFactory( + Schema schema, + PartitionSpec spec, + int[] equalityFieldIds, + int[] partialFieldIds, + int[] fullFieldIds, + Schema eqDeleteRowSchema, + Schema posDeleteRowSchema, + Schema partialRowSchema, + Schema fullRowSchema) { this.schema = schema; this.spec = spec; this.equalityFieldIds = equalityFieldIds; + this.partialFieldIds = partialFieldIds; + this.fullFieldIds = fullFieldIds; this.eqDeleteRowSchema = eqDeleteRowSchema; this.posDeleteRowSchema = posDeleteRowSchema; + this.partialRowSchema = partialRowSchema; + this.fullRowSchema = fullRowSchema; } public GenericAppenderFactory set(String property, String value) { @@ -135,6 +166,42 @@ public org.apache.iceberg.io.DataWriter newDataWriter( file.keyMetadata()); } + @Override + public PartialDeleteWriter newPartialWriter( + EncryptedOutputFile file, FileFormat format, StructLike partition) { + Preconditions.checkState( + equalityFieldIds != null && equalityFieldIds.length > 0, + "Equality field ids shouldn't be null or empty when creating equality-delete writer"); + Preconditions.checkNotNull( + eqDeleteRowSchema, + "Equality delete row schema shouldn't be null when creating equality-delete writer"); + + MetricsConfig metricsConfig = MetricsConfig.fromProperties(config); + try { + switch (format) { + case AVRO: + return Avro.writeDeletes(file.encryptingOutputFile()) + .createWriterFunc(DataWriter::create) + .withPartition(partition) + .overwrite() + .setAll(config) + .rowSchema(fullRowSchema) + .withSpec(spec) + .metricsConfig(metricsConfig) + .withKeyMetadata(file.keyMetadata()) + .equalityFieldIds(equalityFieldIds) + .partialFieldIds(partialFieldIds) + .buildPartialWriter(); + + default: + throw new UnsupportedOperationException( + "Cannot write equality-deletes for unsupported file format: " + format); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + @Override public EqualityDeleteWriter newEqDeleteWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java index 0779ed09ce1e..18d9b204d49f 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericDeleteFilter.java @@ -18,11 +18,15 @@ */ package org.apache.iceberg.data; +import java.util.List; +import java.util.Map; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; public class GenericDeleteFilter extends DeleteFilter { private final FileIO io; @@ -45,6 +49,20 @@ protected StructLike asStructLike(Record record) { return asStructLike.wrap(record); } + @Override + protected Record combineRecord( + Record record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + Map overwriteValues = + Maps.newHashMapWithExpectedSize(partialSchema.columns().size()); + + List columns = partialSchema.columns(); + for (int i = 0; i < columns.size(); i++) { + overwriteValues.put(columns.get(i).name(), partialRecord.get(i, Object.class)); + } + + return record.copy(overwriteValues); + } + @Override protected InputFile getInputFile(String location) { return io.newInputFile(location); diff --git a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java index 6531441fa57f..e1f872736f72 100644 --- a/data/src/test/java/org/apache/iceberg/data/FileHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/FileHelpers.java @@ -33,6 +33,7 @@ import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; @@ -100,6 +101,45 @@ public static DeleteFile writeDeleteFile( return writer.toDeleteFile(); } + public static DeleteFile writePartialFile( + Table table, + OutputFile out, + StructLike partition, + List deletes, + Schema equalitySchema, + Schema partialDataSchema, + Schema partialFullSchema) + throws IOException { + FileFormat format = defaultFormat(table.properties()); + int[] equalityFieldIds = + equalitySchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + int[] partialFieldIds = + partialDataSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + int[] partialFullFieldIds = + partialFullSchema.columns().stream().mapToInt(Types.NestedField::fieldId).toArray(); + + FileAppenderFactory factory = + new GenericAppenderFactory( + table.schema(), + table.spec(), + equalityFieldIds, + partialFieldIds, + partialFullFieldIds, + equalitySchema, + null, + partialDataSchema, + partialFullSchema); + + PartialDeleteWriter writer = factory.newPartialWriter(encrypt(out), format, partition); + try (Closeable toClose = writer) { + writer.write(deletes); + } + + return writer.toDeleteFile(); + } + public static DataFile writeDataFile(Table table, OutputFile out, List rows) throws IOException { FileFormat format = defaultFormat(table.properties()); diff --git a/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java b/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java new file mode 100644 index 000000000000..ded84e6cd66d --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/data/PartialReadTests.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data; + +import java.io.IOException; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public abstract class PartialReadTests { + // Schema passed to create tables + public static final Schema DATE_SCHEMA = + new Schema( + Types.NestedField.required(1, "dt", Types.DateType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "data2", Types.StringType.get()), + Types.NestedField.required(4, "id", Types.IntegerType.get())); + + // Partition spec used to create tables + public static final PartitionSpec DATE_SPEC = + PartitionSpec.builderFor(DATE_SCHEMA).day("dt").build(); + + @Rule public TemporaryFolder temp = new TemporaryFolder(); + + protected String tableName = null; + protected String dateTableName = null; + protected Table table = null; + protected Table dateTable = null; + protected List records = null; + private List dateRecords = null; + protected DataFile dataFile = null; + + @Before + public void writeTestDataFile() throws IOException { + this.dateTableName = "test2"; + this.dateTable = createTable(dateTableName, DATE_SCHEMA, DATE_SPEC); + + GenericRecord record = GenericRecord.create(dateTable.schema()); + this.dateRecords = Lists.newArrayList(); + + Map overwriteValues = Maps.newHashMapWithExpectedSize(4); + overwriteValues.put("dt", LocalDate.parse("2021-09-01")); + overwriteValues.put("data", "a"); + overwriteValues.put("data2", "a2"); + overwriteValues.put("id", 1); + dateRecords.add(record.copy(overwriteValues)); + + overwriteValues.put("dt", LocalDate.parse("2021-09-02")); + overwriteValues.put("data", "b"); + overwriteValues.put("data2", "b2"); + overwriteValues.put("id", 2); + dateRecords.add(record.copy(overwriteValues)); + + DataFile dataFile1 = + FileHelpers.writeDataFile( + dateTable, + Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), + dateRecords); + + dateTable.newAppend().appendFile(dataFile1).commit(); + } + + @After + public void cleanup() throws IOException { + dropTable("test"); + dropTable("test2"); + } + + protected abstract Table createTable(String name, Schema schema, PartitionSpec spec) + throws IOException; + + protected abstract void dropTable(String name) throws IOException; + + protected abstract StructLikeSet rowSet(String name, Table testTable, String... columns) + throws IOException; + + protected boolean countPartial() { + return false; + } + + /** + * This will only be called after calling rowSet(String, Table, String...), and only if + * countDeletes() is true. + */ + protected long deleteCount() { + return 0L; + } + + protected void checkPartialCount(long expectedPartials) { + if (countPartial()) { + long actualPartials = deleteCount(); + Assert.assertEquals( + "Table should contain expected number of deletes", expectedPartials, actualPartials); + } + } + + // todo need more tests for partial update + + @Test + public void testPartialData() throws IOException { + Schema partialFullSchema = dateTable.schema().select("dt", "data", "id"); + Schema eqDeleteSchema = dateTable.schema().select("dt", "id"); + Schema partialDataSchema = dateTable.schema().select("data"); + + Record dataPartial = GenericRecord.create(partialFullSchema); + List dataDeletes = + Lists.newArrayList( + dataPartial.copy("dt", LocalDate.parse("2021-09-01"), "data", "a3", "id", 1)); + + DeleteFile partialFile = + FileHelpers.writePartialFile( + dateTable, + Files.localOutput(temp.newFile()), + Row.of(DateTimeUtil.daysFromDate(LocalDate.parse("2021-09-01"))), + dataDeletes, + eqDeleteSchema, + partialDataSchema, + partialFullSchema); + + dateTable.newRowDelta().addDeletes(partialFile).commit(); + + List expectedDateRecords = Lists.newArrayList(); + GenericRecord record = GenericRecord.create(dateTable.schema()); + + Map overwriteValues = Maps.newHashMapWithExpectedSize(4); + overwriteValues.put("dt", LocalDate.parse("2021-09-01")); + overwriteValues.put("data", "a3"); + overwriteValues.put("data2", "a2"); + overwriteValues.put("id", 1); + expectedDateRecords.add(record.copy(overwriteValues)); + + overwriteValues.put("dt", LocalDate.parse("2021-09-02")); + overwriteValues.put("data", "b"); + overwriteValues.put("data2", "b2"); + overwriteValues.put("id", 2); + expectedDateRecords.add(record.copy(overwriteValues)); + + StructLikeSet expected = rowSetWithoutIds(dateTable, expectedDateRecords); + + StructLikeSet actual = rowSet(dateTableName, dateTable, "*"); + + Assert.assertEquals("Table should contain expected rows", expected, actual); + } + + protected static StructLikeSet rowSetWithoutIds( + Table table, List recordList, int... idsToRemove) { + Set deletedIds = Sets.newHashSet(ArrayUtil.toIntList(idsToRemove)); + StructLikeSet set = StructLikeSet.create(table.schema().asStruct()); + recordList.stream() + .filter(row -> !deletedIds.contains(row.getField("id"))) + .map(record -> new InternalRecordWrapper(table.schema().asStruct()).wrap(record)) + .forEach(set::add); + return set; + } +} diff --git a/deploy.gradle b/deploy.gradle index 8e0d43fe02e4..4273af28c57e 100644 --- a/deploy.gradle +++ b/deploy.gradle @@ -81,7 +81,7 @@ subprojects { } } - groupId = 'org.apache.iceberg' + groupId = 'org.apache.iceberg-kn' pom { name = 'Apache Iceberg' description = 'A table format for huge analytic datasets' @@ -109,18 +109,7 @@ subprojects { } repositories { - maven { - credentials { - username project.hasProperty('mavenUser') ? "$mavenUser" : "" - password project.hasProperty('mavenPassword') ? "$mavenPassword" : "" - } - // upload to the releases repository using ./gradlew -Prelease publish - def apacheSnapshotsRepoUrl = 'https://repository.apache.org/content/repositories/snapshots' - def apacheReleasesRepoUrl = 'https://repository.apache.org/service/local/staging/deploy/maven2' - def snapshotsRepoUrl = project.hasProperty('mavenSnapshotsRepo') ? "$mavenSnapshotsRepo" : "$apacheSnapshotsRepoUrl" - def releasesRepoUrl = project.hasProperty('mavenReleasesRepo') ? "$mavenReleasesRepo" : "$apacheReleasesRepoUrl" - url = project.hasProperty('release') ? releasesRepoUrl : snapshotsRepoUrl - } + mavenLocal() } } diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..b935d85d6b44 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -214,6 +214,12 @@ protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); } + @Override + protected RowData combineRecord( + RowData record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java index 5fada27d5471..ed25d74c76bd 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/RowDataFileScanTaskReader.java @@ -214,6 +214,12 @@ protected StructLike asStructLike(RowData row) { return asStructLike.wrap(row); } + @Override + protected RowData combineRecord( + RowData record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return inputFilesDecryptor.getInputFile(location); diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java new file mode 100644 index 000000000000..97b9a02303ff --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes2.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.mr; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.PartialReadTests; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.util.StructLikeSet; +import org.junit.Assert; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestInputFormatReaderDeletes2 extends PartialReadTests { + private final Configuration conf = new Configuration(); + private final HadoopTables tables = new HadoopTables(conf); + private TestHelper helper; + + // parametrized variables + private final String inputFormat; + private final FileFormat fileFormat; + + @Parameterized.Parameters(name = "inputFormat = {0}, fileFormat={1}") + public static Object[][] parameters() { + return new Object[][] { + {"IcebergInputFormat", FileFormat.AVRO}, + }; + } + + @Before + @Override + public void writeTestDataFile() throws IOException { + conf.set(CatalogUtil.ICEBERG_CATALOG_TYPE, Catalogs.LOCATION); + super.writeTestDataFile(); + } + + public TestInputFormatReaderDeletes2(String inputFormat, FileFormat fileFormat) { + this.inputFormat = inputFormat; + this.fileFormat = fileFormat; + } + + @Override + protected Table createTable(String name, Schema schema, PartitionSpec spec) throws IOException { + Table table; + + File location = temp.newFolder(inputFormat, fileFormat.name()); + Assert.assertTrue(location.delete()); + helper = new TestHelper(conf, tables, location.toString(), schema, spec, fileFormat, temp); + table = helper.createTable(); + + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + return table; + } + + @Override + protected void dropTable(String name) { + tables.dropTable(helper.table().location()); + } + + @Override + public StructLikeSet rowSet(String name, Table table, String... columns) { + InputFormatConfig.ConfigBuilder builder = + new InputFormatConfig.ConfigBuilder(conf).readFrom(table.location()); + Schema projected = table.schema().select(columns); + StructLikeSet set = StructLikeSet.create(projected.asStruct()); + + set.addAll( + TestIcebergInputFormats.TESTED_INPUT_FORMATS.stream() + .filter(recordFactory -> recordFactory.name().equals(inputFormat)) + .map( + recordFactory -> + recordFactory.create(builder.project(projected).conf()).getRecords()) + .flatMap(List::stream) + .map(record -> new InternalRecordWrapper(projected.asStruct()).wrap(record)) + .collect(Collectors.toList())); + + return set; + } +} diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java index 5b2b877a97d4..7991a194ece2 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java @@ -61,6 +61,7 @@ import org.apache.iceberg.data.orc.GenericOrcWriter; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -499,6 +500,8 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; + private SortOrder sortOrder; private Function pathTransformFunc = Function.identity(); @@ -575,11 +578,21 @@ public DeleteWriteBuilder equalityFieldIds(List fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(List fieldIds) { + this.partialFieldIds = ArrayUtil.toIntArray(fieldIds); + return this; + } + public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { this.equalityFieldIds = fieldIds; return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { this.pathTransformFunc = newPathTransformFunc; return this; @@ -590,6 +603,51 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.ORC, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public EqualityDeleteWriter buildEqualityWriter() { Preconditions.checkState( rowSchema != null, "Cannot create equality delete file without a schema"); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 8b1e6c056403..f24989c14f59 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -69,6 +69,7 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.deletes.PartialDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptionKeyMetadata; import org.apache.iceberg.exceptions.RuntimeIOException; @@ -669,6 +670,7 @@ public static class DeleteWriteBuilder { private StructLike partition = null; private EncryptionKeyMetadata keyMetadata = null; private int[] equalityFieldIds = null; + private int[] partialFieldIds = null; private SortOrder sortOrder; private Function pathTransformFunc = Function.identity(); @@ -745,11 +747,21 @@ public DeleteWriteBuilder equalityFieldIds(List fieldIds) { return this; } + public DeleteWriteBuilder partialFieldIds(List fieldIds) { + this.partialFieldIds = ArrayUtil.toIntArray(fieldIds); + return this; + } + public DeleteWriteBuilder equalityFieldIds(int... fieldIds) { this.equalityFieldIds = fieldIds; return this; } + public DeleteWriteBuilder partialFieldIds(int... fieldIds) { + this.partialFieldIds = fieldIds; + return this; + } + public DeleteWriteBuilder transformPaths(Function newPathTransformFunc) { this.pathTransformFunc = newPathTransformFunc; return this; @@ -760,6 +772,51 @@ public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) { return this; } + public PartialDeleteWriter buildPartialWriter() throws IOException { + Preconditions.checkState( + rowSchema != null, "Cannot create equality delete file without a schema"); + Preconditions.checkState( + equalityFieldIds != null, "Cannot create partial file without delete field ids"); + Preconditions.checkState( + partialFieldIds != null, "Cannot create partial file without partial field ids"); + Preconditions.checkState( + createWriterFunc != null, "Cannot create partial file unless createWriterFunc is set"); + Preconditions.checkArgument( + spec != null, "Spec must not be null when creating partial writer"); + Preconditions.checkArgument( + spec.isUnpartitioned() || partition != null, + "Partition must not be null for partitioned writes"); + + meta("delete-type", "partial"); + meta( + "delete-field-ids", + IntStream.of(equalityFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + meta( + "partial-field-ids", + IntStream.of(partialFieldIds) + .mapToObj(Objects::toString) + .collect(Collectors.joining(", "))); + + // the appender uses the row schema without extra columns + appenderBuilder.schema(rowSchema); + appenderBuilder.createWriterFunc(createWriterFunc); + appenderBuilder.createContextFunc(WriteBuilder.Context::deleteContext); + + return new PartialDeleteWriter<>( + appenderBuilder.build(), + FileFormat.PARQUET, + location, + spec, + partition, + keyMetadata, + sortOrder, + equalityFieldIds, + partialFieldIds); + } + public EqualityDeleteWriter buildEqualityWriter() throws IOException { Preconditions.checkState( rowSchema != null, "Cannot create equality delete file without a schema"); diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f206149da30e..1507b79d7102 100644 --- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -189,6 +189,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return RowDataReader.this.getInputFile(location); diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java new file mode 100644 index 000000000000..1507b79d7102 --- /dev/null +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import java.util.Map; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.data.SparkAvroReader; +import org.apache.iceberg.spark.data.SparkOrcReader; +import org.apache.iceberg.spark.data.SparkParquetReaders; +import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.rdd.InputFileBlockHolder; +import org.apache.spark.sql.catalyst.InternalRow; + +class RowDataReader extends BaseDataReader { + + private final Schema tableSchema; + private final Schema expectedSchema; + private final String nameMapping; + private final boolean caseSensitive; + + RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) { + super(table, task); + this.tableSchema = table.schema(); + this.expectedSchema = expectedSchema; + this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); + this.caseSensitive = caseSensitive; + } + + @Override + CloseableIterator open(FileScanTask task) { + SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema); + + // schema or rows returned by readers + Schema requiredSchema = deletes.requiredSchema(); + Map idToConstant = constantsMap(task, expectedSchema); + DataFile file = task.file(); + + // update the current file for Spark's filename() function + InputFileBlockHolder.set(file.path().toString(), task.start(), task.length()); + + return deletes.filter(open(task, requiredSchema, idToConstant)).iterator(); + } + + protected Schema tableSchema() { + return tableSchema; + } + + protected CloseableIterable open( + FileScanTask task, Schema readSchema, Map idToConstant) { + CloseableIterable iter; + if (task.isDataTask()) { + iter = newDataIterable(task.asDataTask(), readSchema); + } else { + InputFile location = getInputFile(task); + Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); + + switch (task.file().format()) { + case PARQUET: + iter = newParquetIterable(location, task, readSchema, idToConstant); + break; + + case AVRO: + iter = newAvroIterable(location, task, readSchema, idToConstant); + break; + + case ORC: + iter = newOrcIterable(location, task, readSchema, idToConstant); + break; + + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + } + } + + return iter; + } + + private CloseableIterable newAvroIterable( + InputFile location, FileScanTask task, Schema projection, Map idToConstant) { + Avro.ReadBuilder builder = + Avro.read(location) + .reuseContainers() + .project(projection) + .split(task.start(), task.length()) + .createReaderFunc( + readSchema -> new SparkAvroReader(projection, readSchema, idToConstant)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newParquetIterable( + InputFile location, FileScanTask task, Schema readSchema, Map idToConstant) { + Parquet.ReadBuilder builder = + Parquet.read(location) + .reuseContainers() + .split(task.start(), task.length()) + .project(readSchema) + .createReaderFunc( + fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newOrcIterable( + InputFile location, FileScanTask task, Schema readSchema, Map idToConstant) { + Schema readSchemaWithoutConstantAndMetadataFields = + TypeUtil.selectNot( + readSchema, Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + + ORC.ReadBuilder builder = + ORC.read(location) + .project(readSchemaWithoutConstantAndMetadataFields) + .split(task.start(), task.length()) + .createReaderFunc( + readOrcSchema -> new SparkOrcReader(readSchema, readOrcSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newDataIterable(DataTask task, Schema readSchema) { + StructInternalRow row = new StructInternalRow(readSchema.asStruct()); + CloseableIterable asSparkRows = + CloseableIterable.transform(task.asDataTask().rows(), row::setStruct); + return asSparkRows; + } + + protected class SparkDeleteFilter extends DeleteFilter { + private final InternalRowWrapper asStructLike; + + SparkDeleteFilter(FileScanTask task, Schema tableSchema, Schema requestedSchema) { + super(task.file().path().toString(), task.deletes(), tableSchema, requestedSchema); + this.asStructLike = new InternalRowWrapper(SparkSchemaUtil.convert(requiredSchema())); + } + + @Override + protected StructLike asStructLike(InternalRow row) { + return asStructLike.wrap(row); + } + + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + + @Override + protected InputFile getInputFile(String location) { + return RowDataReader.this.getInputFile(location); + } + } +} diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 68e98ba913b7..46d8697c1e99 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -157,6 +157,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BatchDataReader.this.getInputFile(location); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index f206149da30e..1507b79d7102 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -189,6 +189,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return RowDataReader.this.getInputFile(location); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 2333cd734bbe..c87506f97f3c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -261,6 +261,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BaseReader.this.getInputFile(location); diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java index 2333cd734bbe..07c41c68df05 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java @@ -261,6 +261,12 @@ protected StructLike asStructLike(InternalRow row) { return asStructLike.wrap(row); } + @Override + protected InternalRow combineRecord( + InternalRow record, StructLike partialRecord, Schema partialSchema, Schema requiredSchema) { + throw new UnsupportedOperationException("Partial updates are not yet supported."); + } + @Override protected InputFile getInputFile(String location) { return BaseReader.this.getInputFile(location);