Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Refactor delete logic in batch reading #11933

Merged
merged 8 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@

class ColumnVectorBuilder {
private boolean[] isDeleted;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to keep isDeleted here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. I have also changed the DeletedColumnVector constructor accordingly.

private int[] rowIdMapping;

public ColumnVectorBuilder withDeletedRows(int[] rowIdMappingArray, boolean[] isDeletedArray) {
this.rowIdMapping = rowIdMappingArray;
this.isDeleted = isDeletedArray;
return this;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the time of building the vectors, isDeleted and rowIdMapping are always null, so deleting this method for now.


public ColumnVector build(VectorHolder holder, int numRows) {
if (holder.isDummy()) {
Expand All @@ -46,8 +39,6 @@ public ColumnVector build(VectorHolder holder, int numRows) {
} else {
throw new IllegalStateException("Unknown dummy vector holder: " + holder);
}
} else if (rowIdMapping != null) {
return new ColumnVectorWithFilter(holder, rowIdMapping);
} else {
return new IcebergArrowColumnVector(holder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
*/
package org.apache.iceberg.spark.data.vectorized;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -88,7 +85,7 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
}

private class ColumnBatchLoader {
private final int numRowsToRead;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, shouldn't numRowsToRead be the same and equal to the original batch size? I would probably use a separate variable like batchSize or numLiveRows to track the number of live entries in the batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to batchSize. Also added a local variable numLiveRows to track the number of live entries in the batch.

private int numRowsToRead;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need these instance variables anymore, they can become local variables now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed. Thanks

// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when
// there is no deletes
private int[] rowIdMapping;
Expand All @@ -106,27 +103,43 @@ private class ColumnBatchLoader {
}

ColumnarBatch loadDataToColumnBatch() {
int numRowsUndeleted = initRowIdMapping();

ColumnVector[] arrowColumnVectors = readDataToColumnVectors();
if (!hasIsDeletedColumn) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having ! in the condition, I'd flip the branches for readability. Interpreting ! inside if conditions is always harder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flipped. Thanks

Pair<int[], Integer> pair =
ColumnarBatchUtil.buildRowIdMapping(
arrowColumnVectors, deletes, rowStartPosInBatch, numRowsToRead);
if (pair != null) {
rowIdMapping = pair.first();
numRowsToRead = pair.second();
for (int i = 0; i < arrowColumnVectors.length; i++) {
ColumnVector vector = arrowColumnVectors[i];
if (vector instanceof IcebergArrowColumnVector) {
arrowColumnVectors[i] =
new ColumnVectorWithFilter(
((IcebergArrowColumnVector) vector).vector(), rowIdMapping);
}
}
}
} else {
isDeleted =
ColumnarBatchUtil.buildIsDeleted(
arrowColumnVectors, deletes, rowStartPosInBatch, numRowsToRead);
for (int i = 0; i < arrowColumnVectors.length; i++) {
ColumnVector vector = arrowColumnVectors[i];
if (vector instanceof DeletedColumnVector) {
((DeletedColumnVector) vector).setIsDeleted(isDeleted);
}
}
}

ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);
newColumnarBatch.setNumRows(numRowsUndeleted);
newColumnarBatch.setNumRows(numRowsToRead);

if (hasEqDeletes()) {
applyEqDelete(newColumnarBatch);
newColumnarBatch = removeExtraColumns(arrowColumnVectors, newColumnarBatch);
}

if (hasIsDeletedColumn && rowIdMapping != null) {
// reset the row id mapping array, so that it doesn't filter out the deleted rows
for (int i = 0; i < numRowsToRead; i++) {
rowIdMapping[i] = i;
}
newColumnarBatch.setNumRows(numRowsToRead);
if (deletes != null && deletes.hasEqDeletes()) {
return ColumnarBatchUtil.removeExtraColumns(deletes, arrowColumnVectors, newColumnarBatch);
} else {
return newColumnarBatch;
}

return newColumnarBatch;
}

ColumnVector[] readDataToColumnVectors() {
Expand All @@ -142,151 +155,9 @@ ColumnVector[] readDataToColumnVectors() {
numRowsInVector,
numRowsToRead);

arrowColumnVectors[i] =
columnVectorBuilder
.withDeletedRows(rowIdMapping, isDeleted)
.build(vectorHolders[i], numRowsInVector);
arrowColumnVectors[i] = columnVectorBuilder.build(vectorHolders[i], numRowsInVector);
}
return arrowColumnVectors;
}

boolean hasEqDeletes() {
return deletes != null && deletes.hasEqDeletes();
}

int initRowIdMapping() {
Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();
if (posDeleteRowIdMapping != null) {
rowIdMapping = posDeleteRowIdMapping.first();
return posDeleteRowIdMapping.second();
} else {
rowIdMapping = initEqDeleteRowIdMapping();
return numRowsToRead;
}
}

Pair<int[], Integer> posDelRowIdMapping() {
if (deletes != null && deletes.hasPosDeletes()) {
return buildPosDelRowIdMapping(deletes.deletedRowPositions());
} else {
return null;
}
}

/**
* Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we
* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the
* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position
* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]
* [F,F,T,F,F,F,T,F] -- After applying position deletes
*
* @param deletedRowPositions a set of deleted row positions
* @return the mapping array and the new num of rows in a batch, null if no row is deleted
*/
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {
if (deletedRowPositions == null) {
return null;
}

int[] posDelRowIdMapping = new int[numRowsToRead];
int originalRowId = 0;
int currentRowId = 0;
while (originalRowId < numRowsToRead) {
if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {
posDelRowIdMapping[currentRowId] = originalRowId;
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
}

deletes.incrementDeleteCount();
}
originalRowId++;
}

if (currentRowId == numRowsToRead) {
// there is no delete in this batch
return null;
} else {
return Pair.of(posDelRowIdMapping, currentRowId);
}
}

int[] initEqDeleteRowIdMapping() {
int[] eqDeleteRowIdMapping = null;
if (hasEqDeletes()) {
eqDeleteRowIdMapping = new int[numRowsToRead];
for (int i = 0; i < numRowsToRead; i++) {
eqDeleteRowIdMapping[i] = i;
}
}

return eqDeleteRowIdMapping;
}

/**
* Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original
* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted
* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num
* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=
* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]
* [F,T,T,T,F,F,T,F] -- After applying equality deletes
*
* @param columnarBatch the {@link ColumnarBatch} to apply the equality delete
*/
void applyEqDelete(ColumnarBatch columnarBatch) {
Iterator<InternalRow> it = columnarBatch.rowIterator();
int rowId = 0;
int currentRowId = 0;
while (it.hasNext()) {
InternalRow row = it.next();
if (deletes.eqDeletedRowFilter().test(row)) {
// the row is NOT deleted
// skip deleted rows by pointing to the next undeleted row Id
rowIdMapping[currentRowId] = rowIdMapping[rowId];
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[rowIdMapping[rowId]] = true;
}

deletes.incrementDeleteCount();
}

rowId++;
}

columnarBatch.setNumRows(currentRowId);
}

/**
* Removes extra columns added for processing equality delete filters that are not part of the
* final query output.
*
* <p>During query execution, additional columns may be included in the schema to evaluate
* equality delete filters. For example, if the table schema contains columns C1, C2, C3, C4,
* and C5, and the query is 'SELECT C5 FROM table' while equality delete filters are applied on
* C3 and C4, the processing schema includes C5, C3, and C4. These extra columns (C3 and C4) are
* needed to identify rows to delete but are not included in the final result.
*
* <p>This method removes these extra columns from the end of {@code arrowColumnVectors},
* ensuring only the expected columns remain.
*
* @param arrowColumnVectors the array of column vectors representing query result data
* @param columnarBatch the original {@code ColumnarBatch} containing query results
* @return a new {@code ColumnarBatch} with extra columns removed, or the original batch if no
* extra columns were found
*/
ColumnarBatch removeExtraColumns(
ColumnVector[] arrowColumnVectors, ColumnarBatch columnarBatch) {
int expectedColumnSize = deletes.expectedSchema().columns().size();
if (arrowColumnVectors.length > expectedColumnSize) {
ColumnVector[] newColumns = Arrays.copyOf(arrowColumnVectors, expectedColumnSize);
return new ColumnarBatch(newColumns, columnarBatch.numRows());
} else {
return columnarBatch;
}
}
}
}
Loading
Loading