diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java index 7414a457c520..5f3fc370e4c0 100644 --- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java @@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) { } public static List generateList( - Random random, Types.ListType list, Supplier elementResult) { + Random random, Types.ListType list, Supplier elementSupplier) { int numElements = random.nextInt(20); List result = Lists.newArrayListWithExpectedSize(numElements); @@ -246,7 +246,7 @@ public static List generateList( if (list.isElementOptional() && random.nextInt(20) == 1) { result.add(null); } else { - result.add(elementResult.get()); + result.add(elementSupplier.get()); } } @@ -254,15 +254,18 @@ public static List generateList( } public static Map generateMap( - Random random, Types.MapType map, Supplier keyResult, Supplier valueResult) { + Random random, + Types.MapType map, + Supplier keySupplier, + Supplier valueSupplier) { int numEntries = random.nextInt(20); Map result = Maps.newLinkedHashMap(); Supplier keyFunc; if (map.keyType() == Types.StringType.get()) { - keyFunc = () -> keyResult.get().toString(); + keyFunc = () -> keySupplier.get().toString(); } else { - keyFunc = keyResult; + keyFunc = keySupplier; } Set keySet = Sets.newHashSet(); @@ -279,7 +282,7 @@ public static Map generateMap( if (map.isValueOptional() && random.nextInt(20) == 1) { result.put(key, null); } else { - result.put(key, valueResult.get()); + result.put(key, valueSupplier.get()); } } diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java index 9364affe3915..3786a6e0ef17 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java @@ -583,7 +583,7 @@ private void checkColumnarBatch( columnSet, "uuid", (records, i) -> records.get(i).getField("uuid"), - ColumnVector::getBinary); + (array, i) -> UUIDUtil.convert(array.getBinary(i))); checkColumnarArrayValues( expectedNumRows, @@ -593,7 +593,7 @@ private void checkColumnarBatch( columnSet, "uuid_nullable", (records, i) -> records.get(i).getField("uuid_nullable"), - ColumnVector::getBinary); + (array, i) -> UUIDUtil.convert(array.getBinary(i))); checkColumnarArrayValues( expectedNumRows, @@ -820,8 +820,7 @@ private List createIncrementalRecordsForDate( rec.setField("int_promotion", i); rec.setField("time", LocalTime.of(11, i)); rec.setField("time_nullable", LocalTime.of(11, i)); - ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID()); - byte[] uuid = bb.array(); + UUID uuid = UUID.randomUUID(); rec.setField("uuid", uuid); rec.setField("uuid_nullable", uuid); rec.setField("decimal", new BigDecimal("14.0" + i % 10)); @@ -858,9 +857,7 @@ private List createConstantRecordsForDate(Schema schema, LocalDat rec.setField("int_promotion", 1); rec.setField("time", LocalTime.of(11, 30)); rec.setField("time_nullable", LocalTime.of(11, 30)); - ByteBuffer bb = - UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f")); - byte[] uuid = bb.array(); + UUID uuid = UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"); rec.setField("uuid", uuid); rec.setField("uuid_nullable", uuid); rec.setField("decimal", new BigDecimal("14.20")); @@ -1140,7 +1137,7 @@ private void checkAllVectorValues( columnSet, "uuid", (records, i) -> records.get(i).getField("uuid"), - (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)); + (vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i))); checkVectorValues( expectedNumRows, @@ -1149,7 +1146,7 @@ private void checkAllVectorValues( columnSet, "uuid_nullable", (records, i) -> records.get(i).getField("uuid_nullable"), - (vector, i) -> ((FixedSizeBinaryVector) vector).get(i)); + (vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i))); checkVectorValues( expectedNumRows, diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index efdf9cc9b01d..6fbcbb750bdf 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -18,19 +18,9 @@ */ package org.apache.iceberg.data.parquet; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; @@ -50,6 +40,10 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated public abstract class BaseParquetReaders { protected BaseParquetReaders() {} @@ -76,6 +70,46 @@ protected ParquetValueReader createReader( protected abstract ParquetValueReader createStructReader( List types, List> fieldReaders, Types.StructType structType); + protected ParquetValueReader fixedReader(ColumnDescriptor desc) { + return new GenericParquetReaders.FixedReader(desc); + } + + protected ParquetValueReader dateReader(ColumnDescriptor desc) { + return new GenericParquetReaders.DateReader(desc); + } + + protected ParquetValueReader timeReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) { + switch (unit) { + case MICROS: + return new GenericParquetReaders.TimeReader(desc); + case MILLIS: + return new GenericParquetReaders.TimeMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for time: " + unit); + } + } + + protected ParquetValueReader timestampReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) { + if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + return new GenericParquetReaders.TimestampInt96Reader(desc); + } + + switch (unit) { + case MICROS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzReader(desc) + : new GenericParquetReaders.TimestampReader(desc); + case MILLIS: + return isAdjustedToUTC + ? new GenericParquetReaders.TimestamptzMillisReader(desc) + : new GenericParquetReaders.TimestampMillisReader(desc); + default: + throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit); + } + } + protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { return value; } @@ -164,37 +198,23 @@ public Optional> visit(DecimalLogicalTypeAnnotation decima @Override public Optional> visit( LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { - return Optional.of(new DateReader(desc)); + return Optional.of(dateReader(desc)); } @Override public Optional> visit( LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { - if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - return Optional.of(new TimeReader(desc)); - } else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - return Optional.of(new TimeMillisReader(desc)); - } - - return Optional.empty(); + return Optional.of(timeReader(desc, timeLogicalType.getUnit())); } @Override public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { - if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - Types.TimestampType tsMicrosType = (Types.TimestampType) expected; - return tsMicrosType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzReader(desc)) - : Optional.of(new TimestampReader(desc)); - } else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { - Types.TimestampType tsMillisType = (Types.TimestampType) expected; - return tsMillisType.shouldAdjustToUTC() - ? Optional.of(new TimestamptzMillisReader(desc)) - : Optional.of(new TimestampMillisReader(desc)); - } - - return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); + return Optional.of( + timestampReader( + desc, + timestampLogicalType.getUnit(), + ((Types.TimestampType) expected).shouldAdjustToUTC())); } @Override @@ -219,6 +239,12 @@ public Optional> visit( LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { return Optional.of(new ParquetValueReaders.BytesReader(desc)); } + + @Override + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueReaders.uuids(desc)); + } } private class ReadBuilder extends TypeWithSchemaVisitor> { @@ -359,7 +385,7 @@ public ParquetValueReader primitive( ColumnDescriptor desc = type.getColumnDescription(currentPath()); - if (primitive.getOriginalType() != null) { + if (primitive.getLogicalTypeAnnotation() != null) { return primitive .getLogicalTypeAnnotation() .accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) @@ -371,7 +397,7 @@ public ParquetValueReader primitive( switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: - return new FixedReader(desc); + return fixedReader(desc); case BINARY: if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) { return new ParquetValueReaders.StringReader(desc); @@ -397,7 +423,7 @@ public ParquetValueReader primitive( case INT96: // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards // compatibility we try to read INT96 as timestamps. - return new TimestampInt96Reader(desc); + return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -407,124 +433,4 @@ MessageType type() { return type; } } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateReader extends ParquetValueReaders.PrimitiveReader { - private DateReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDate read(LocalDate reuse) { - return EPOCH_DAY.plusDays(column.nextInteger()); - } - } - - private static class TimestampReader extends ParquetValueReaders.PrimitiveReader { - private TimestampReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestampMillisReader - extends ParquetValueReaders.PrimitiveReader { - private TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalDateTime read(LocalDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); - } - } - - private static class TimestampInt96Reader - extends ParquetValueReaders.PrimitiveReader { - private static final long UNIX_EPOCH_JULIAN = 2_440_588L; - - private TimestampInt96Reader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - final ByteBuffer byteBuffer = - column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - final long timeOfDayNanos = byteBuffer.getLong(); - final int julianDay = byteBuffer.getInt(); - - return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) - .plusNanos(timeOfDayNanos) - .atOffset(ZoneOffset.UTC); - } - } - - private static class TimestamptzReader - extends ParquetValueReaders.PrimitiveReader { - private TimestamptzReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); - } - } - - private static class TimestamptzMillisReader - extends ParquetValueReaders.PrimitiveReader { - private TimestamptzMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public OffsetDateTime read(OffsetDateTime reuse) { - return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); - } - } - - private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { - private TimeMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); - } - } - - private static class TimeReader extends ParquetValueReaders.PrimitiveReader { - private TimeReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public LocalTime read(LocalTime reuse) { - return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); - } - } - - private static class FixedReader extends ParquetValueReaders.PrimitiveReader { - private FixedReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public byte[] read(byte[] reuse) { - if (reuse != null) { - column.nextBinary().toByteBuffer().duplicate().get(reuse); - return reuse; - } else { - return column.nextBinary().getBytes(); - } - } - } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java index 470f95e8bc99..1bfa90b26303 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetWriter.java @@ -18,13 +18,6 @@ */ package org.apache.iceberg.data.parquet; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Optional; import org.apache.iceberg.parquet.ParquetTypeVisitor; @@ -33,13 +26,16 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +/** + * @deprecated since 1.8.0, will be made package-private in 1.9.0 + */ +@Deprecated public abstract class BaseParquetWriter { @SuppressWarnings("unchecked") @@ -50,6 +46,26 @@ protected ParquetValueWriter createWriter(MessageType type) { protected abstract ParquetValueWriters.StructWriter createStructWriter( List> writers); + protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.FixedWriter(desc); + } + + protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.DateWriter(desc); + } + + protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { + return new GenericParquetWriter.TimeWriter(desc); + } + + protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { + if (isAdjustedToUTC) { + return new GenericParquetWriter.TimestamptzWriter(desc); + } else { + return new GenericParquetWriter.TimestampWriter(desc); + } + } + private class WriteBuilder extends ParquetTypeVisitor> { private final MessageType type; @@ -119,7 +135,7 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { ColumnDescriptor desc = type.getColumnDescription(currentPath()); LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation(); if (logicalType != null) { - Optional> writer = + Optional> writer = logicalType.accept(new LogicalTypeWriterVisitor(desc)); if (writer.isPresent()) { return writer.get(); @@ -128,7 +144,7 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { switch (primitive.getPrimitiveTypeName()) { case FIXED_LEN_BYTE_ARRAY: - return new FixedWriter(desc); + return fixedWriter(desc); case BINARY: return ParquetValueWriters.byteBuffers(desc); case BOOLEAN: @@ -147,9 +163,8 @@ public ParquetValueWriter primitive(PrimitiveType primitive) { } } - private static class LogicalTypeWriterVisitor - implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor< - ParquetValueWriters.PrimitiveWriter> { + private class LogicalTypeWriterVisitor + implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor> { private final ColumnDescriptor desc; private LogicalTypeWriterVisitor(ColumnDescriptor desc) { @@ -157,19 +172,19 @@ private LogicalTypeWriterVisitor(ColumnDescriptor desc) { } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) { return Optional.of(ParquetValueWriters.strings(desc)); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) { return Optional.of(ParquetValueWriters.strings(desc)); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) { switch (desc.getPrimitiveType().getPrimitiveTypeName()) { case INT32: @@ -190,33 +205,33 @@ public Optional> visit( } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) { - return Optional.of(new DateWriter(desc)); + return Optional.of(dateWriter(desc)); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) { - return Optional.of(new TimeWriter(desc)); + Preconditions.checkArgument( + LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()), + "Cannot write time in %s, only MICROS is supported", + timeType.getUnit()); + return Optional.of(timeWriter(desc)); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) { Preconditions.checkArgument( LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()), "Cannot write timestamp in %s, only MICROS is supported", timestampType.getUnit()); - if (timestampType.isAdjustedToUTC()) { - return Optional.of(new TimestamptzWriter(desc)); - } else { - return Optional.of(new TimestampWriter(desc)); - } + return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC())); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) { Preconditions.checkArgument( intType.isSigned() || intType.getBitWidth() < 64, @@ -229,75 +244,21 @@ public Optional> visit( } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { return Optional.of(ParquetValueWriters.strings(desc)); } @Override - public Optional> visit( + public Optional> visit( LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) { return Optional.of(ParquetValueWriters.byteBuffers(desc)); } - } - - private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - - private static class DateWriter extends ParquetValueWriters.PrimitiveWriter { - private DateWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDate value) { - column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); - } - } - - private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { - private TimeWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalTime value) { - column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); - } - } - - private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { - private TimestampWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, LocalDateTime value) { - column.writeLong( - repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); - } - } - - private static class TimestamptzWriter - extends ParquetValueWriters.PrimitiveWriter { - private TimestamptzWriter(ColumnDescriptor desc) { - super(desc); - } - - @Override - public void write(int repetitionLevel, OffsetDateTime value) { - column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); - } - } - - private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { - private FixedWriter(ColumnDescriptor desc) { - super(desc); - } @Override - public void write(int repetitionLevel, byte[] value) { - column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); + public Optional> visit( + LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) { + return Optional.of(ParquetValueWriters.uuids(desc)); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java index 8023cef71dae..3bf924081ed7 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetReaders.java @@ -18,15 +18,25 @@ */ package org.apache.iceberg.data.parquet; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericDataUtil; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueReader; -import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; +import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; @@ -49,7 +59,7 @@ public static ParquetValueReader buildReader( @Override protected ParquetValueReader createStructReader( List types, List> fieldReaders, StructType structType) { - return new RecordReader(types, fieldReaders, structType); + return ParquetValueReaders.recordReader(types, fieldReaders, structType); } @Override @@ -57,39 +67,119 @@ protected Object convertConstant(org.apache.iceberg.types.Type type, Object valu return GenericDataUtil.internalToGeneric(type, value); } - private static class RecordReader extends StructReader { - private final GenericRecord template; + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - RecordReader(List types, List> readers, StructType struct) { - super(types, readers); - this.template = struct != null ? GenericRecord.create(struct) : null; + static class DateReader extends ParquetValueReaders.PrimitiveReader { + DateReader(ColumnDescriptor desc) { + super(desc); } @Override - protected Record newStructData(Record reuse) { - if (reuse != null) { - return reuse; - } else { - // GenericRecord.copy() is more performant then GenericRecord.create(StructType) since - // NAME_MAP_CACHE access - // is eliminated. Using copy here to gain performance. - return template.copy(); - } + public LocalDate read(LocalDate reuse) { + return EPOCH_DAY.plusDays(column.nextInteger()); + } + } + + static class TimestampReader extends ParquetValueReaders.PrimitiveReader { + TimestampReader(ColumnDescriptor desc) { + super(desc); } @Override - protected Object getField(Record intermediate, int pos) { - return intermediate.get(pos); + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); + } + } + + static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader { + TimestampMillisReader(ColumnDescriptor desc) { + super(desc); } @Override - protected Record buildStruct(Record struct) { - return struct; + public LocalDateTime read(LocalDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); + } + } + + static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader { + private static final long UNIX_EPOCH_JULIAN = 2_440_588L; + + TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + final ByteBuffer byteBuffer = + column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + final long timeOfDayNanos = byteBuffer.getLong(); + final int julianDay = byteBuffer.getInt(); + + return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) + .plusNanos(timeOfDayNanos) + .atOffset(ZoneOffset.UTC); + } + } + + static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader { + TimestamptzReader(ColumnDescriptor desc) { + super(desc); } @Override - protected void set(Record struct, int pos, Object value) { - struct.set(pos, value); + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); + } + } + + static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader { + TimestamptzMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public OffsetDateTime read(OffsetDateTime reuse) { + return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); + } + } + + static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader { + TimeMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextInteger() * 1000000L); + } + } + + static class TimeReader extends ParquetValueReaders.PrimitiveReader { + TimeReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public LocalTime read(LocalTime reuse) { + return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); + } + } + + static class FixedReader extends ParquetValueReaders.PrimitiveReader { + FixedReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public byte[] read(byte[] reuse) { + if (reuse != null) { + column.nextBinary().toByteBuffer().duplicate().get(reuse); + return reuse; + } else { + return column.nextBinary().getBytes(); + } } } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java index 7f2c107b8dc8..b4727cc52bf6 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/GenericParquetWriter.java @@ -18,10 +18,21 @@ */ package org.apache.iceberg.data.parquet; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; import java.util.List; import org.apache.iceberg.data.Record; import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; public class GenericParquetWriter extends BaseParquetWriter { @@ -35,17 +46,73 @@ public static ParquetValueWriter buildWriter(MessageType type) { @Override protected StructWriter createStructWriter(List> writers) { - return new RecordWriter(writers); + return ParquetValueWriters.recordWriter(writers); } - private static class RecordWriter extends StructWriter { - private RecordWriter(List> writers) { - super(writers); + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + + static class DateWriter extends ParquetValueWriters.PrimitiveWriter { + DateWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDate value) { + column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value)); + } + } + + static class TimeWriter extends ParquetValueWriters.PrimitiveWriter { + TimeWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalTime value) { + column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000); + } + } + + static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter { + TimestampWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, LocalDateTime value) { + column.writeLong( + repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC))); + } + } + + static class TimestamptzWriter extends ParquetValueWriters.PrimitiveWriter { + TimestamptzWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, OffsetDateTime value) { + column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value)); + } + } + + static class FixedWriter extends ParquetValueWriters.PrimitiveWriter { + private final int length; + + FixedWriter(ColumnDescriptor desc) { + super(desc); + this.length = desc.getPrimitiveType().getTypeLength(); } @Override - protected Object get(Record struct, int index) { - return struct.get(index); + public void write(int repetitionLevel, byte[] value) { + Preconditions.checkArgument( + value.length == length, + "Cannot write byte buffer of length %s as fixed[%s]", + value.length, + length); + column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value)); } } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java new file mode 100644 index 000000000000..3bf0a4e80130 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalReader.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.types.Types.StructType; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +public class InternalReader extends BaseParquetReaders { + + private static final InternalReader INSTANCE = new InternalReader<>(); + + private InternalReader() {} + + @SuppressWarnings("unchecked") + public static ParquetValueReader create( + Schema expectedSchema, MessageType fileSchema) { + return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema); + } + + @SuppressWarnings("unchecked") + public static ParquetValueReader create( + Schema expectedSchema, MessageType fileSchema, Map idToConstant) { + return (ParquetValueReader) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant); + } + + @Override + @SuppressWarnings("unchecked") + protected ParquetValueReader createStructReader( + List types, List> fieldReaders, StructType structType) { + return (ParquetValueReader) + ParquetValueReaders.recordReader(types, fieldReaders, structType); + } + + @Override + protected ParquetValueReader fixedReader(ColumnDescriptor desc) { + return new ParquetValueReaders.BytesReader(desc); + } + + @Override + protected ParquetValueReader dateReader(ColumnDescriptor desc) { + return new ParquetValueReaders.UnboxedReader<>(desc); + } + + @Override + protected ParquetValueReader timeReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) { + if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return ParquetValueReaders.millisAsTimes(desc); + } + + return new ParquetValueReaders.UnboxedReader<>(desc); + } + + @Override + protected ParquetValueReader timestampReader( + ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) { + if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) { + return ParquetValueReaders.int96Timestamps(desc); + } + + if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + return ParquetValueReaders.millisAsTimestamps(desc); + } + + return new ParquetValueReaders.UnboxedReader<>(desc); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java new file mode 100644 index 000000000000..b42f07ce18ce --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/InternalWriter.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.parquet; + +import java.util.List; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.parquet.ParquetValueWriter; +import org.apache.iceberg.parquet.ParquetValueWriters; +import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter; +import org.apache.iceberg.types.Type; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.MessageType; + +/** + * A Writer that consumes Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + */ +public class InternalWriter extends BaseParquetWriter { + private static final InternalWriter INSTANCE = new InternalWriter<>(); + + private InternalWriter() {} + + @SuppressWarnings("unchecked") + public static ParquetValueWriter create(MessageType type) { + return (ParquetValueWriter) INSTANCE.createWriter(type); + } + + @Override + protected StructWriter createStructWriter(List> writers) { + return ParquetValueWriters.recordWriter(writers); + } + + @Override + protected ParquetValueWriter fixedWriter(ColumnDescriptor desc) { + return ParquetValueWriters.fixedBuffers(desc); + } + + @Override + protected ParquetValueWriter dateWriter(ColumnDescriptor desc) { + return ParquetValueWriters.ints(desc); + } + + @Override + protected ParquetValueWriter timeWriter(ColumnDescriptor desc) { + return ParquetValueWriters.longs(desc); + } + + @Override + protected ParquetValueWriter timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) { + return ParquetValueWriters.longs(desc); + } +} diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index b055a139fa59..31f73b3bce74 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -24,12 +24,18 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; @@ -63,6 +69,27 @@ public static ParquetValueReader position() { return new PositionReader(); } + public static ParquetValueReader uuids(ColumnDescriptor desc) { + return new ParquetValueReaders.UUIDReader(desc); + } + + public static ParquetValueReader int96Timestamps(ColumnDescriptor desc) { + return new ParquetValueReaders.TimestampInt96Reader(desc); + } + + public static ParquetValueReader millisAsTimes(ColumnDescriptor desc) { + return new ParquetValueReaders.TimeMillisReader(desc); + } + + public static ParquetValueReader millisAsTimestamps(ColumnDescriptor desc) { + return new ParquetValueReaders.TimestampMillisReader(desc); + } + + public static ParquetValueReader recordReader( + List types, List> readers, Types.StructType struct) { + return new RecordReader(types, readers, struct); + } + private static class NullReader implements ParquetValueReader { private static final NullReader INSTANCE = new NullReader<>(); private static final ImmutableList> COLUMNS = ImmutableList.of(); @@ -401,6 +428,17 @@ public ByteBuffer read(ByteBuffer reuse) { } } + private static class UUIDReader extends PrimitiveReader { + private UUIDReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public UUID read(UUID reuse) { + return UUIDUtil.convert(column.nextBinary().toByteBuffer()); + } + } + public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader { public ByteArrayReader(ColumnDescriptor desc) { super(desc); @@ -412,6 +450,57 @@ public byte[] read(byte[] ignored) { } } + private static class TimestampInt96Reader extends UnboxedReader { + + private TimestampInt96Reader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + final ByteBuffer byteBuffer = + column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); + return ParquetUtil.extractTimestampInt96(byteBuffer); + } + } + + private static class TimeMillisReader extends UnboxedReader { + private TimeMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + return 1000L * column.nextInteger(); + } + } + + private static class TimestampMillisReader extends UnboxedReader { + private TimestampMillisReader(ColumnDescriptor desc) { + super(desc); + } + + @Override + public Long read(Long ignored) { + return readLong(); + } + + @Override + public long readLong() { + return 1000L * column.nextLong(); + } + } + private static class OptionReader implements ParquetValueReader { private final int definitionLevel; private final ParquetValueReader reader; @@ -850,4 +939,39 @@ private TripleIterator firstNonNullColumn(List> columns) { return NullReader.NULL_COLUMN; } } + + private static class RecordReader extends StructReader { + private final GenericRecord template; + + RecordReader(List types, List> readers, Types.StructType struct) { + super(types, readers); + this.template = struct != null ? GenericRecord.create(struct) : null; + } + + @Override + protected Record newStructData(Record reuse) { + if (reuse != null) { + return reuse; + } else { + // GenericRecord.copy() is more performant than GenericRecord.create(StructType) since + // NAME_MAP_CACHE access is eliminated. Using copy here to gain performance. + return template.copy(); + } + } + + @Override + protected Object getField(Record intermediate, int pos) { + return intermediate.get(pos); + } + + @Override + protected Record buildStruct(Record struct) { + return struct; + } + + @Override + protected void set(Record struct, int pos, Object value) { + struct.set(pos, value); + } + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java index 90766983d8c8..1a7ebe0767d8 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriters.java @@ -21,11 +21,13 @@ import java.lang.reflect.Array; import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,11 +35,13 @@ import org.apache.iceberg.DoubleFieldMetrics; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FloatFieldMetrics; +import org.apache.iceberg.StructLike; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.UUIDUtil; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.io.api.Binary; @@ -87,6 +91,10 @@ public static PrimitiveWriter strings(ColumnDescriptor desc) { return new StringWriter(desc); } + public static PrimitiveWriter uuids(ColumnDescriptor desc) { + return new UUIDWriter(desc); + } + public static PrimitiveWriter decimalAsInteger( ColumnDescriptor desc, int precision, int scale) { return new IntegerDecimalWriter(desc, precision, scale); @@ -106,6 +114,10 @@ public static PrimitiveWriter byteBuffers(ColumnDescriptor desc) { return new BytesWriter(desc); } + public static PrimitiveWriter fixedBuffers(ColumnDescriptor desc) { + return new FixedBufferWriter(desc); + } + public static CollectionWriter collections(int dl, int rl, ParquetValueWriter writer) { return new CollectionWriter<>(dl, rl, writer); } @@ -115,6 +127,11 @@ public static MapWriter maps( return new MapWriter<>(dl, rl, keyWriter, valueWriter); } + public static StructWriter recordWriter( + List> writers) { + return new RecordWriter<>(writers); + } + public abstract static class PrimitiveWriter implements ParquetValueWriter { @SuppressWarnings("checkstyle:VisibilityModifier") protected final ColumnWriter column; @@ -313,6 +330,25 @@ public void write(int repetitionLevel, ByteBuffer buffer) { } } + private static class FixedBufferWriter extends PrimitiveWriter { + private final int length; + + private FixedBufferWriter(ColumnDescriptor desc) { + super(desc); + this.length = desc.getPrimitiveType().getTypeLength(); + } + + @Override + public void write(int repetitionLevel, ByteBuffer buffer) { + Preconditions.checkArgument( + buffer.remaining() == length, + "Cannot write byte buffer of length %s as fixed[%s]", + buffer.remaining(), + length); + column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); + } + } + private static class StringWriter extends PrimitiveWriter { private StringWriter(ColumnDescriptor desc) { super(desc); @@ -330,6 +366,37 @@ public void write(int repetitionLevel, CharSequence value) { } } + private static class UUIDWriter extends PrimitiveWriter { + private static final ThreadLocal BUFFER = + ThreadLocal.withInitial( + () -> { + ByteBuffer buffer = ByteBuffer.allocate(16); + buffer.order(ByteOrder.BIG_ENDIAN); + return buffer; + }); + + private UUIDWriter(ColumnDescriptor desc) { + super(desc); + } + + @Override + public void write(int repetitionLevel, UUID value) { + ByteBuffer buffer = UUIDUtil.convertToByteBuffer(value, BUFFER.get()); + column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer)); + } + } + + private static class RecordWriter extends StructWriter { + private RecordWriter(List> writers) { + super(writers); + } + + @Override + protected Object get(T struct, int index) { + return struct.get(index, Object.class); + } + } + static class OptionWriter implements ParquetValueWriter { private final int definitionLevel; private final ParquetValueWriter writer; diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java new file mode 100644 index 000000000000..aa9ff48e8e61 --- /dev/null +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestInternalParquet.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.io.IOException; +import java.util.List; +import org.apache.iceberg.InternalTestHelpers; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RandomInternalData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.avro.AvroDataTest; +import org.apache.iceberg.data.parquet.InternalReader; +import org.apache.iceberg.data.parquet.InternalWriter; +import org.apache.iceberg.inmemory.InMemoryOutputFile; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class TestInternalParquet extends AvroDataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + List expected = RandomInternalData.generate(schema, 100, 1376L); + + OutputFile outputFile = new InMemoryOutputFile(); + + try (DataWriter dataWriter = + Parquet.writeData(outputFile) + .schema(schema) + .createWriterFunc(InternalWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build()) { + for (StructLike record : expected) { + dataWriter.write(record); + } + } + + List rows; + try (CloseableIterable reader = + Parquet.read(outputFile.toInputFile()) + .project(schema) + .createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema)) + .build()) { + rows = Lists.newArrayList(reader); + } + + for (int i = 0; i < expected.size(); i += 1) { + InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + } + + // test reuseContainers + try (CloseableIterable reader = + Parquet.read(outputFile.toInputFile()) + .project(schema) + .reuseContainers() + .createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema)) + .build()) { + int index = 0; + for (StructLike actualRecord : reader) { + InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(index), actualRecord); + index += 1; + } + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 3ce54d2d9ffa..4fec047dc987 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -21,14 +21,12 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; import org.apache.iceberg.parquet.ParquetSchemaUtil; -import org.apache.iceberg.parquet.ParquetUtil; import org.apache.iceberg.parquet.ParquetValueReader; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader; @@ -265,7 +263,7 @@ public ParquetValueReader primitive( case TIMESTAMP_MICROS: return new UnboxedReader<>(desc); case TIMESTAMP_MILLIS: - return new TimestampMillisReader(desc); + return ParquetValueReaders.millisAsTimestamps(desc); case DECIMAL: DecimalLogicalTypeAnnotation decimal = (DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation(); @@ -315,7 +313,7 @@ public ParquetValueReader primitive( case INT96: // Impala & Spark used to write timestamps as INT96 without a logical type. For backwards // compatibility we try to read INT96 as timestamps. - return new TimestampInt96Reader(desc); + return ParquetValueReaders.int96Timestamps(desc); default: throw new UnsupportedOperationException("Unsupported type: " + primitive); } @@ -373,41 +371,6 @@ public Decimal read(Decimal ignored) { } } - private static class TimestampMillisReader extends UnboxedReader { - TimestampMillisReader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public Long read(Long ignored) { - return readLong(); - } - - @Override - public long readLong() { - return 1000 * column.nextLong(); - } - } - - private static class TimestampInt96Reader extends UnboxedReader { - - TimestampInt96Reader(ColumnDescriptor desc) { - super(desc); - } - - @Override - public Long read(Long ignored) { - return readLong(); - } - - @Override - public long readLong() { - final ByteBuffer byteBuffer = - column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); - return ParquetUtil.extractTimestampInt96(byteBuffer); - } - } - private static class StringReader extends PrimitiveReader { StringReader(ColumnDescriptor desc) { super(desc);