-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Spark 3.5: Refactor delete logic in batch reading #11933
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f30593c
a2d4e76
9ad559a
4478c45
dba987a
7351312
902926e
b6ac1da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,12 @@ | |
*/ | ||
package org.apache.iceberg.spark.data.vectorized; | ||
|
||
import java.util.Arrays; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.iceberg.arrow.vectorized.BaseBatchReader; | ||
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; | ||
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader; | ||
import org.apache.iceberg.data.DeleteFilter; | ||
import org.apache.iceberg.deletes.PositionDeleteIndex; | ||
import org.apache.iceberg.parquet.VectorizedReader; | ||
import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
import org.apache.iceberg.util.Pair; | ||
|
@@ -88,44 +85,51 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { | |
} | ||
|
||
private class ColumnBatchLoader { | ||
private final int numRowsToRead; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. renamed to |
||
// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when | ||
// there is no deletes | ||
private int[] rowIdMapping; | ||
// the array to indicate if a row is deleted or not, it is null when there is no "_deleted" | ||
// metadata column | ||
private boolean[] isDeleted; | ||
private final int batchSize; | ||
|
||
ColumnBatchLoader(int numRowsToRead) { | ||
Preconditions.checkArgument( | ||
numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead); | ||
this.numRowsToRead = numRowsToRead; | ||
if (hasIsDeletedColumn) { | ||
isDeleted = new boolean[numRowsToRead]; | ||
} | ||
this.batchSize = numRowsToRead; | ||
} | ||
|
||
ColumnarBatch loadDataToColumnBatch() { | ||
int numRowsUndeleted = initRowIdMapping(); | ||
|
||
ColumnVector[] arrowColumnVectors = readDataToColumnVectors(); | ||
|
||
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); | ||
newColumnarBatch.setNumRows(numRowsUndeleted); | ||
|
||
if (hasEqDeletes()) { | ||
applyEqDelete(newColumnarBatch); | ||
newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch); | ||
int numLiveRows = batchSize; | ||
if (hasIsDeletedColumn) { | ||
boolean[] isDeleted = | ||
ColumnarBatchUtil.buildIsDeleted( | ||
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize); | ||
for (int i = 0; i < arrowColumnVectors.length; i++) { | ||
ColumnVector vector = arrowColumnVectors[i]; | ||
if (vector instanceof DeletedColumnVector) { | ||
((DeletedColumnVector) vector).setValue(isDeleted); | ||
} | ||
} | ||
} else { | ||
Pair<int[], Integer> pair = | ||
ColumnarBatchUtil.buildRowIdMapping( | ||
arrowColumnVectors, deletes, rowStartPosInBatch, batchSize); | ||
if (pair != null) { | ||
int[] rowIdMapping = pair.first(); | ||
numLiveRows = pair.second(); | ||
for (int i = 0; i < arrowColumnVectors.length; i++) { | ||
ColumnVector vector = arrowColumnVectors[i]; | ||
if (vector instanceof IcebergArrowColumnVector) { | ||
arrowColumnVectors[i] = | ||
new ColumnVectorWithFilter( | ||
((IcebergArrowColumnVector) vector).vector(), rowIdMapping); | ||
} | ||
} | ||
} | ||
} | ||
|
||
if (hasIsDeletedColumn && rowIdMapping != null) { | ||
// reset the row id mapping array, so that it doesn't filter out the deleted rows | ||
for (int i = 0; i < numRowsToRead; i++) { | ||
rowIdMapping[i] = i; | ||
} | ||
newColumnarBatch.setNumRows(numRowsToRead); | ||
if (deletes != null && deletes.hasEqDeletes()) { | ||
arrowColumnVectors = ColumnarBatchUtil.removeExtraColumns(deletes, arrowColumnVectors); | ||
} | ||
|
||
ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors); | ||
newColumnarBatch.setNumRows(numLiveRows); | ||
return newColumnarBatch; | ||
} | ||
|
||
|
@@ -134,159 +138,17 @@ ColumnVector[] readDataToColumnVectors() { | |
|
||
ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder(); | ||
for (int i = 0; i < readers.length; i += 1) { | ||
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); | ||
vectorHolders[i] = readers[i].read(vectorHolders[i], batchSize); | ||
int numRowsInVector = vectorHolders[i].numValues(); | ||
Preconditions.checkState( | ||
numRowsInVector == numRowsToRead, | ||
numRowsInVector == batchSize, | ||
"Number of rows in the vector %s didn't match expected %s ", | ||
numRowsInVector, | ||
numRowsToRead); | ||
batchSize); | ||
|
||
arrowColumnVectors[i] = | ||
columnVectorBuilder | ||
.withDeletedRows(rowIdMapping, isDeleted) | ||
.build(vectorHolders[i], numRowsInVector); | ||
arrowColumnVectors[i] = columnVectorBuilder.build(vectorHolders[i], numRowsInVector); | ||
} | ||
return arrowColumnVectors; | ||
} | ||
|
||
boolean hasEqDeletes() { | ||
return deletes != null && deletes.hasEqDeletes(); | ||
} | ||
|
||
int initRowIdMapping() { | ||
Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping(); | ||
if (posDeleteRowIdMapping != null) { | ||
rowIdMapping = posDeleteRowIdMapping.first(); | ||
return posDeleteRowIdMapping.second(); | ||
} else { | ||
rowIdMapping = initEqDeleteRowIdMapping(); | ||
return numRowsToRead; | ||
} | ||
} | ||
|
||
Pair<int[], Integer> posDelRowIdMapping() { | ||
if (deletes != null && deletes.hasPosDeletes()) { | ||
return buildPosDelRowIdMapping(deletes.deletedRowPositions()); | ||
} else { | ||
return null; | ||
} | ||
} | ||
|
||
/** | ||
* Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we | ||
* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the | ||
* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position | ||
* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6] | ||
* [F,F,T,F,F,F,T,F] -- After applying position deletes | ||
* | ||
* @param deletedRowPositions a set of deleted row positions | ||
* @return the mapping array and the new num of rows in a batch, null if no row is deleted | ||
*/ | ||
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) { | ||
if (deletedRowPositions == null) { | ||
return null; | ||
} | ||
|
||
int[] posDelRowIdMapping = new int[numRowsToRead]; | ||
int originalRowId = 0; | ||
int currentRowId = 0; | ||
while (originalRowId < numRowsToRead) { | ||
if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) { | ||
posDelRowIdMapping[currentRowId] = originalRowId; | ||
currentRowId++; | ||
} else { | ||
if (hasIsDeletedColumn) { | ||
isDeleted[originalRowId] = true; | ||
} | ||
|
||
deletes.incrementDeleteCount(); | ||
} | ||
originalRowId++; | ||
} | ||
|
||
if (currentRowId == numRowsToRead) { | ||
// there is no delete in this batch | ||
return null; | ||
} else { | ||
return Pair.of(posDelRowIdMapping, currentRowId); | ||
} | ||
} | ||
|
||
int[] initEqDeleteRowIdMapping() { | ||
int[] eqDeleteRowIdMapping = null; | ||
if (hasEqDeletes()) { | ||
eqDeleteRowIdMapping = new int[numRowsToRead]; | ||
for (int i = 0; i < numRowsToRead; i++) { | ||
eqDeleteRowIdMapping[i] = i; | ||
} | ||
} | ||
|
||
return eqDeleteRowIdMapping; | ||
} | ||
|
||
/** | ||
* Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original | ||
* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted | ||
* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num | ||
* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <= | ||
* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4] | ||
* [F,T,T,T,F,F,T,F] -- After applying equality deletes | ||
* | ||
* @param columnarBatch the {@link ColumnarBatch} to apply the equality delete | ||
*/ | ||
void applyEqDelete(ColumnarBatch columnarBatch) { | ||
Iterator<InternalRow> it = columnarBatch.rowIterator(); | ||
int rowId = 0; | ||
int currentRowId = 0; | ||
while (it.hasNext()) { | ||
InternalRow row = it.next(); | ||
if (deletes.eqDeletedRowFilter().test(row)) { | ||
// the row is NOT deleted | ||
// skip deleted rows by pointing to the next undeleted row Id | ||
rowIdMapping[currentRowId] = rowIdMapping[rowId]; | ||
currentRowId++; | ||
} else { | ||
if (hasIsDeletedColumn) { | ||
isDeleted[rowIdMapping[rowId]] = true; | ||
} | ||
|
||
deletes.incrementDeleteCount(); | ||
} | ||
|
||
rowId++; | ||
} | ||
|
||
columnarBatch.setNumRows(currentRowId); | ||
} | ||
|
||
/** | ||
* Removes extra columns added for processing equality delete filters that are not part of the | ||
* final query output. | ||
* | ||
* <p>During query execution, additional columns may be included in the schema to evaluate | ||
* equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4, | ||
* and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on | ||
* C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are | ||
* needed to identify rows to delete but are not included in the final result. | ||
* | ||
* <p>This method removes these extra columns from the end of {@code arrowColumnVectors}, | ||
* ensuring only the expected columns remain. | ||
* | ||
* @param arrowColumnVectors the array of column vectors representing query result data | ||
* @param columnarBatch the original {@code ColumnarBatch} containing query results | ||
* @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no | ||
* extra columns were found | ||
*/ | ||
ColumnarBatch removeExtraColumns( | ||
ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) { | ||
int expectedColumnSize = deletes.expectedSchema().columns().size(); | ||
if (arrowColumnVectors.length > expectedColumnSize) { | ||
ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize); | ||
return new ColumnarBatch(newColumns, columnarBatch.numRows()); | ||
} else { | ||
return columnarBatch; | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the time of building the vectors,
isDeleted
androwIdMapping
are always null, so deleting this method for now.