Skip to content

Commit

Permalink
Parquet: Add readers and writers for the internal object model (#11904)
Browse files Browse the repository at this point in the history
Co-authored-by: Ryan Blue <[email protected]>
  • Loading branch information
ajantha-bhat and rdblue authored Jan 24, 2025
1 parent c0c1b15 commit 67c52b5
Show file tree
Hide file tree
Showing 12 changed files with 747 additions and 323 deletions.
15 changes: 9 additions & 6 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
Random random, Types.ListType list, Supplier<Object> elementSupplier) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
Expand All @@ -246,23 +246,26 @@ public static List<Object> generateList(
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
result.add(elementSupplier.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
Random random,
Types.MapType map,
Supplier<Object> keySupplier,
Supplier<Object> valueSupplier) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
keyFunc = () -> keySupplier.get().toString();
} else {
keyFunc = keyResult;
keyFunc = keySupplier;
}

Set<Object> keySet = Sets.newHashSet();
Expand All @@ -279,7 +282,7 @@ public static Map<Object, Object> generateMap(
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
result.put(key, valueSupplier.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void checkColumnarBatch(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
Expand All @@ -593,7 +593,7 @@ private void checkColumnarBatch(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
Expand Down Expand Up @@ -820,8 +820,7 @@ private List<GenericRecord> createIncrementalRecordsForDate(
rec.setField("int_promotion", i);
rec.setField("time", LocalTime.of(11, i));
rec.setField("time_nullable", LocalTime.of(11, i));
ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID());
byte[] uuid = bb.array();
UUID uuid = UUID.randomUUID();
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.0" + i % 10));
Expand Down Expand Up @@ -858,9 +857,7 @@ private List<GenericRecord> createConstantRecordsForDate(Schema schema, LocalDat
rec.setField("int_promotion", 1);
rec.setField("time", LocalTime.of(11, 30));
rec.setField("time_nullable", LocalTime.of(11, 30));
ByteBuffer bb =
UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"));
byte[] uuid = bb.array();
UUID uuid = UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f");
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.20"));
Expand Down Expand Up @@ -1140,7 +1137,7 @@ private void checkAllVectorValues(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
Expand All @@ -1149,7 +1146,7 @@ private void checkAllVectorValues(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand All @@ -50,6 +40,10 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetReaders<T> {
protected BaseParquetReaders() {}

Expand All @@ -76,6 +70,46 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new GenericParquetReaders.FixedReader(desc);
}

protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new GenericParquetReaders.DateReader(desc);
}

protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
switch (unit) {
case MICROS:
return new GenericParquetReaders.TimeReader(desc);
case MILLIS:
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
}
}

protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}

switch (unit) {
case MICROS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc)
: new GenericParquetReaders.TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
}
}

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
}
Expand Down Expand Up @@ -164,37 +198,23 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
return Optional.of(dateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
return Optional.of(
timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC()));
}

@Override
Expand All @@ -219,6 +239,12 @@ public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(ParquetValueReaders.uuids(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
Expand Down Expand Up @@ -359,7 +385,7 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
Expand All @@ -371,7 +397,7 @@ public ParquetValueReader<?> primitive(

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
Expand All @@ -397,7 +423,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
Expand All @@ -407,124 +433,4 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading

0 comments on commit 67c52b5

Please sign in to comment.