Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
Expand Down Expand Up @@ -193,14 +195,36 @@ public void close() throws IOException {
}
}

@Override
protected int getNext() throws IOException {
int numRows = 0;
for (; numRows < getBatchSize(); numRows++) {
private List<GenericRecord> readNextBatch() throws IOException {
List<GenericRecord> records = new ArrayList<>();

for (int numRows = 0; numRows < getBatchSize(); numRows++) {
if (!avroReader.hasNext(inputPair, ignore)) {
break;
}
GenericRecord rowRecord = (GenericRecord) avroReader.getNext();
records.add(rowRecord);

// for (int i = 0; i < requiredFields.length; i++) {
// Object fieldData = rowRecord.get(requiredFields[i]);
// if (fieldData == null) {
// appendData(i, null);
// } else {
// AvroColumnValue fieldValue = new AvroColumnValue(fieldInspectors[i], fieldData);
// appendData(i, fieldValue);
// }
// }
Comment on lines +208 to +216
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code. This code block appears to be old implementation that has been replaced by the refactored version in the getNext() method. Commented code should be removed rather than left in the codebase.

Suggested change
// for (int i = 0; i < requiredFields.length; i++) {
// Object fieldData = rowRecord.get(requiredFields[i]);
// if (fieldData == null) {
// appendData(i, null);
// } else {
// AvroColumnValue fieldValue = new AvroColumnValue(fieldInspectors[i], fieldData);
// appendData(i, fieldValue);
// }
// }

Copilot uses AI. Check for mistakes.
}
return records;
}


@Override
protected int getNext() throws IOException {
List<GenericRecord> records = readNextBatch();

long startTime = System.nanoTime();
for (GenericRecord rowRecord : records) {
for (int i = 0; i < requiredFields.length; i++) {
Object fieldData = rowRecord.get(requiredFields[i]);
if (fieldData == null) {
Expand All @@ -211,7 +235,9 @@ protected int getNext() throws IOException {
}
}
}
return numRows;
appendDataTime += System.nanoTime() - startTime;

return records.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -166,13 +167,24 @@ public int getNext() throws IOException {
return preExecutionAuthenticator.execute(() -> {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
List<Object> records = new ArrayList<>();
int numRows = 0;
Comment on lines +170 to 171
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The records list is created but remains empty when fields.length == 0 (virtual table case). While this doesn't cause incorrect behavior, it's inefficient to create an unused list. Consider moving the list creation inside the if (fields.length > 0) block to avoid unnecessary allocation.

Suggested change
List<Object> records = new ArrayList<>();
int numRows = 0;
int numRows = 0;
List<Object> records = null;
if (fields.length > 0) {
records = new ArrayList<>();
}

Copilot uses AI. Check for mistakes.
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
}
if (fields.length > 0) {
Object rowData = deserializer.deserialize(value);
records.add(rowData);
}
}

long startTime = System.nanoTime();
// vectorTable is virtual
if (fields.length == 0) {
vectorTable.appendVirtualData(numRows);
} else {
for (Object rowData : records) {
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
Expand All @@ -181,10 +193,8 @@ public int getNext() throws IOException {
}
}
}
// vectorTable is virtual
if (fields.length == 0) {
vectorTable.appendVirtualData(numRows);
}
appendDataTime += System.nanoTime() - startTime;

return numRows;
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,29 @@ private void nextScanTask() throws IOException {
@Override
protected int getNext() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
int rows = 0;
while (rows < getBatchSize()) {

List<StructLike> records = new ArrayList<>();
while (records.size() < getBatchSize()) {
while (!reader.hasNext() && scanTasks.hasNext()) {
nextScanTask();
}
if (!reader.hasNext()) {
break;
}
StructLike row = reader.next();
records.add(reader.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May this consume more memory? Cause we have to save all records before doing the append?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think it will consume much memory, because at most there will only be one batch of data at a time. The reason for storing this data before calling append is that I think some data-retrieval methods might be lazily loaded, so I separated the reader.next() call from the append method.

}
long startTime = System.nanoTime();
for (StructLike row : records) {
for (int i = 0; i < fields.size(); i++) {
NestedField field = fields.get(i);
Object value = row.get(i, field.type().typeId().javaClass());
ColumnValue columnValue = new IcebergSysTableColumnValue(value, timezone);
appendData(i, columnValue);
}
rows++;
}
return rows;
appendDataTime += System.nanoTime() - startTime;

return records.size();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ protected int getNext() throws IOException {
return 0;
}
int rows = Math.min(batchSize, mockRows - readRows);
long startTime = System.nanoTime();
for (int i = 0; i < rows; ++i) {
for (int j = 0; j < types.length; ++j) {
if ((i + j) % 16 == 0) {
Expand All @@ -215,6 +216,7 @@ protected int getNext() throws IOException {
}
}
}
appendDataTime += System.nanoTime() - startTime;
readRows += rows;
return rows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ private int readVectors(int expectedRows) throws IOException {

List<FieldVector> fieldVectors = data.getFieldVectors();
int batchRows = 0;
long startTime = System.nanoTime();
for (FieldVector column : fieldVectors) {
Integer readColumnId = readColumnsToId.get(column.getName());
batchRows = column.getValueCount();
Expand All @@ -275,6 +276,8 @@ private int readVectors(int expectedRows) throws IOException {
appendData(readColumnId, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;

curReadRows += batchRows;
} catch (Exception e) {
String errorMsg = String.format("MaxComputeJniScanner Fail to read arrow data. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -164,8 +165,9 @@ public void close() throws IOException {
}
}

private int readAndProcessNextBatch() throws IOException {
int rows = 0;
private List<InternalRow> readNextBatch() throws IOException {
List<InternalRow> records = new ArrayList<>();

try {
if (recordIterator == null) {
recordIterator = reader.readBatch();
Expand All @@ -174,16 +176,9 @@ private int readAndProcessNextBatch() throws IOException {
while (recordIterator != null) {
InternalRow record;
while ((record = recordIterator.next()) != null) {
columnValue.setOffsetRow(record);
for (int i = 0; i < fields.length; i++) {
columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
long l = System.nanoTime();
appendData(i, columnValue);
appendDataTime += System.nanoTime() - l;
}
rows++;
if (rows >= batchSize) {
return rows;
records.add(record);
if (records.size() >= batchSize) {
return records;
}
}
recordIterator.releaseBatch();
Expand All @@ -196,7 +191,24 @@ private int readAndProcessNextBatch() throws IOException {
getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e);
throw new IOException(e);
}
return rows;
return records;
}

private int readAndProcessNextBatch() throws IOException {

List<InternalRow> records = readNextBatch();

long startTime = System.nanoTime();
for (InternalRow record : records) {
columnValue.setOffsetRow(record);
for (int i = 0; i < fields.length; i++) {
columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
appendData(i, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;

return records.size();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -159,21 +160,15 @@ private void resetDatetimeV2Precision() {
}
}

private int readAndProcessNextBatch() throws IOException {
int rows = 0;
private List<InternalRow> readNextBatch() throws IOException {
List<InternalRow> records = new ArrayList<>();
// int rows = 0;
Copy link

Copilot AI Nov 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove commented-out code. The comment // int rows = 0; serves no purpose and should be deleted.

Suggested change
// int rows = 0;

Copilot uses AI. Check for mistakes.
while (recordIterator != null) {
InternalRow record;
while ((record = recordIterator.next()) != null) {
columnValue.setOffsetRow(record);
for (int i = 0; i < fields.length; i++) {
columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
long l = System.nanoTime();
appendData(i, columnValue);
appendDataTime += System.nanoTime() - l;
}
rows++;
if (rows >= batchSize) {
return rows;
records.add(record);
if (records.size() >= batchSize) {
return records;
}
}
recordIterator.releaseBatch();
Expand All @@ -183,6 +178,23 @@ private int readAndProcessNextBatch() throws IOException {
nextReader();
}
}
return rows;
return records;

}

private int readAndProcessNextBatch() throws IOException {

List<InternalRow> records = readNextBatch();

long startTime = System.nanoTime();
for (InternalRow record : records) {
columnValue.setOffsetRow(record);
for (int i = 0; i < fields.length; i++) {
columnValue.setIdx(i, types[i], paimonDataTypeList.get(i));
appendData(i, columnValue);
}
}
appendDataTime += System.nanoTime() - startTime;
return records.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ public void open() throws IOException {

@Override
public void close() throws IOException {
for (long appendDataTimeN : appendDataTimeNs) {
appendDataTime += appendDataTimeN;
}
if (source != null) {
source.close();
}
Expand Down
Loading