Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ public Type primitive(PrimitiveType primitive) {
return field == null ? primitive : primitive.withId(field.id());
}

@Override
public Type variant(GroupType variant) {
MappedField field = nameMapping.find(currentPath());
return field == null ? variant : variant.withId(field.id());
}

@Override
public void beforeField(Type type) {
fieldNames.push(type.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ public Type primitive(PrimitiveType primitive) {
throw new UnsupportedOperationException("Cannot convert unknown primitive type: " + primitive);
}

@Override
public Type variant(GroupType variant) {
return Types.VariantType.get();
}

private static class ParquetLogicalTypeVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<Type> {
private static final ParquetLogicalTypeVisitor INSTANCE = new ParquetLogicalTypeVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) {
public Boolean primitive(PrimitiveType primitive) {
return primitive.getId() != null;
}

@Override
public Boolean variant(GroupType variant) {
return variant.getId() != null;
}
}

public static Type determineListElementType(GroupType array) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public static <T> T visit(Type type, ParquetTypeVisitor<T> visitor) {
return visitList(group, visitor);
} else if (LogicalTypeAnnotation.mapType().equals(annotation)) {
return visitMap(group, visitor);
} else if (LogicalTypeAnnotation.variantType((byte) 1).equals(annotation)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use Variant.VARIANT_SPEC_VERSION instead of hardcoded 1.

return visitVariant(group, visitor);
}

return visitor.struct(group, visitFields(group, visitor));
Expand Down Expand Up @@ -168,6 +170,10 @@ private static <T> T visitMap(GroupType map, ParquetTypeVisitor<T> visitor) {
}
}

private static <T> T visitVariant(GroupType variant, ParquetTypeVisitor<T> visitor) {
return visitor.variant(variant);
}

private static <T> List<T> visitFields(GroupType group, ParquetTypeVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(group.getFieldCount());
for (Type field : group.getFields()) {
Expand Down Expand Up @@ -202,6 +208,10 @@ public T primitive(PrimitiveType primitive) {
return null;
}

public T variant(GroupType variant) {
return null;
}

public void beforeField(Type type) {
fieldNames.push(type.getName());
}
Expand Down
10 changes: 10 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/RemoveIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ public Type primitive(PrimitiveType primitive) {
.named(primitive.getName());
}

@Override
public Type variant(GroupType variant) {
Types.GroupBuilder<GroupType> builder =
Types.buildGroup(variant.getRepetition()).as(variant.getLogicalTypeAnnotation());
for (Type field : variant.getFields()) {
builder.addField(field);
}
return builder.named(variant.getName());
}

public static MessageType removeIds(MessageType type) {
return (MessageType) ParquetTypeVisitor.visit(type, new RemoveIds());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,20 @@ public void testAssignIdsByNameMapping() {
27,
"m2",
Types.MapType.ofOptional(
28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES))))));
28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES))))),
optional(30, "variant_col", Types.VariantType.get()),
required(
31, "list_of_variants", Types.ListType.ofOptional(32, Types.VariantType.get())),
optional(
33,
"struct_with_variant",
Types.StructType.of(
Types.NestedField.required(34, "id", Types.IntegerType.get()),
Types.NestedField.optional(35, "data", Types.VariantType.get()))),
required(
36,
"map_with_variant_value",
Types.MapType.ofOptional(37, 38, Types.StringType.get(), Types.VariantType.get())));

Schema schema =
new Schema(
Expand Down Expand Up @@ -224,7 +237,16 @@ public void testSchemaConversionWithoutAssigningIds() {
"map_col_5",
Repetition.REQUIRED,
primitive(28, "k", PrimitiveTypeName.INT32, Repetition.REQUIRED),
primitive(29, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)));
primitive(29, "v", PrimitiveTypeName.INT32, Repetition.REQUIRED)),
variant(30, "variant_col_1", Repetition.OPTIONAL),
variant(null, "variant_col_2", Repetition.REQUIRED),
list(31, "list_col_6", Repetition.OPTIONAL, variant(32, "v", Repetition.OPTIONAL)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test with a variant in map?

struct(
33,
"struct_col_3",
Repetition.REQUIRED,
primitive(34, "n1", PrimitiveTypeName.INT32, Repetition.REQUIRED),
variant(null, "variant_field", Repetition.OPTIONAL)));

Schema expectedSchema =
new Schema(
Expand Down Expand Up @@ -255,8 +277,13 @@ public void testSchemaConversionWithoutAssigningIds() {
required(
27,
"map_col_5",
Types.MapType.ofRequired(
28, 29, Types.IntegerType.get(), Types.IntegerType.get())));
Types.MapType.ofRequired(28, 29, Types.IntegerType.get(), Types.IntegerType.get())),
optional(30, "variant_col_1", Types.VariantType.get()),
optional(31, "list_col_6", Types.ListType.ofOptional(32, Types.VariantType.get())),
required(
33,
"struct_col_3",
Types.StructType.of(required(34, "n1", Types.IntegerType.get()))));

Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
assertThat(actualSchema.asStruct())
Expand Down Expand Up @@ -427,6 +454,17 @@ public void testLegacyTwoLevelListGenByParquetThrift1() {
.isEqualTo(expectedSchema.asStruct());
}

@Test
public void testVariantTypeConversion() {
MessageType messageType =
new MessageType("test", variant(1, "variant_field", Repetition.OPTIONAL));
Schema expectedSchema = new Schema(optional(1, "variant_field", Types.VariantType.get()));
Schema actualSchema = ParquetSchemaUtil.convertAndPrune(messageType);
assertThat(actualSchema.asStruct())
.as("Schema must match")
.isEqualTo(expectedSchema.asStruct());
}

private Type primitive(
Integer id, String name, PrimitiveTypeName typeName, Repetition repetition) {
PrimitiveBuilder<PrimitiveType> builder =
Expand Down Expand Up @@ -464,4 +502,18 @@ private Type map(Integer id, String name, Repetition repetition, Type keyType, T
}
return builder.named(name);
}

private Type variant(Integer id, String name, Repetition repetition) {
GroupBuilder<GroupType> builder =
org.apache.parquet.schema.Types.buildGroup(repetition)
.as(org.apache.parquet.schema.LogicalTypeAnnotation.variantType((byte) 1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Use Variant.VARIANT_SPEC_VERSION instead of 1.

.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
.named("metadata")
.primitive(PrimitiveTypeName.BINARY, Repetition.REQUIRED)
.named("value");
if (id != null) {
builder.id(id);
}
return builder.named(name);
}
}