Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ static Schema visit(StructTypeInfo typeInfo, Schema schema) {

@Override
public Schema struct(StructTypeInfo struct, Schema partner, List<Schema.Field> fieldResults) {
// Check if this is a union-encoded-as-struct by looking for "tag" field in fieldResults
boolean isUnionEncodedAsStruct = fieldResults.stream()
.anyMatch(f -> f.name().equalsIgnoreCase("tag"));

boolean shouldResultBeOptional = partner == null || isNullableType(partner);
Schema result;
if (partner == null || SchemaUtilities.extractIfOption(partner).getType() != Schema.Type.RECORD) {
Expand All @@ -61,18 +65,57 @@ public Schema struct(StructTypeInfo struct, Schema partner, List<Schema.Field> f
}
// While calling `makeNullable`, we should respect the option order of `partner`
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
// However, if this is a union-encoded-as-struct, don't make it nullable - it represents the union structure itself
return (shouldResultBeOptional && !isUnionEncodedAsStruct)
? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
: result;
}

/**
* Helper method to get an appropriate default value for a schema type.
* Used for union-encoded-as-struct fields that need non-null defaults.
*
* @param schema The schema to get a default value for
* @return An appropriate default value, or null if none can be determined
*/
private Object getDefaultValueForSchema(Schema schema) {
switch (schema.getType()) {
case INT:
return 0;
case LONG:
return 0L;
case FLOAT:
return 0.0f;
case DOUBLE:
return 0.0;
case BOOLEAN:
return false;
case STRING:
return "";
default:
return null;
}
}

@Override
public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Schema fieldResult) {
// No need to infer `shouldResultBeOptional`. We expect other visitor methods to return optional schemas
// in their field results if required
if (partner == null) {
// if there was no matching Avro field, use name form the Hive schema and set a null default
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
null);
// if there was no matching Avro field, use name from the Hive schema
// Special case: for union-encoded-as-struct fields (tag, field0, field1, ...), keep them non-nullable
// even though they don't have Avro partners, because they're part of the union encoding structure
boolean isUnionEncodingField = name.equals("tag") || name.matches("field\\d+");
if (isUnionEncodingField && !AvroSerdeUtils.isNullableType(fieldResult)) {
// For union encoding fields, use a non-null default appropriate for the type
Object defaultValue = getDefaultValueForSchema(fieldResult);
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
defaultValue);
} else {
// For regular fields not found in Avro, make them optional with null default
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
null);
}
} else {
// TODO: How to ensure that field default value is compatible with new field type generated from Hive?
// Copy field type from the visitor result, copy everything else from the partner
Expand Down Expand Up @@ -157,9 +200,61 @@ private static class AvroPartnerAccessor implements PartnerAccessor<Schema, Sche

private static final Schema MAP_KEY = Schema.create(Schema.Type.STRING);

/**
* Extracts the actual type from a schema that may be a union.
* Handles two cases:
* 1. Single-element unions like [{"type":"record",...}] - extracts the single type
* 2. Nullable unions like ["null", "string"] - extracts the non-null type
*
* @param schema The schema to extract from
* @return The extracted schema
*/
private static Schema extractFromUnion(Schema schema) {
if (schema.getType() == Schema.Type.UNION) {
List<Schema> types = schema.getTypes();
if (types.size() == 1) {
// Single-element union: just extract that single type
return types.get(0);
}
}
// Extract the non-null type from nullable unions like ["null", "string"]
return SchemaUtilities.extractIfOption(schema);
}

@Override
public Schema.Field fieldPartner(Schema partner, String fieldName) {
Schema schema = SchemaUtilities.extractIfOption(partner);

// Special case: If the partner is a union and the field name matches the union-encoded-as-struct pattern,
// this is a Hive union-encoded-as-struct. We need to return a synthetic field that wraps the corresponding union branch.
// Check the ORIGINAL partner, not the extracted schema, because extractIfOption may unwrap 2-way unions
if (partner.getType() == Schema.Type.UNION) {
// Handle "tag" field - this is the discriminator for the union, should be non-nullable int
if (fieldName.equalsIgnoreCase("tag")) {
// Return a synthetic non-nullable int field for the tag
// This ensures primitive() gets a non-null partner and doesn't make it nullable
Schema intSchema = Schema.create(Schema.Type.INT);
return AvroCompatibilityHelper.createSchemaField(fieldName, intSchema, "Union tag discriminator", 0);
}

// Handle "fieldN" fields - these correspond to union branches
if (fieldName.matches("field\\d+")) {
try {
int fieldIndex = Integer.parseInt(fieldName.substring(5)); // Extract number from "fieldN"
// Use the original partner (which is the union), not the extracted schema
Schema unionWithoutNull = SchemaUtilities.discardNullFromUnionIfExist(partner);
List<Schema> branches = unionWithoutNull.getTypes();
if (fieldIndex < branches.size()) {
// Create a synthetic field that wraps the union branch schema
Schema branchSchema = branches.get(fieldIndex);
return AvroCompatibilityHelper.createSchemaField(fieldName, branchSchema, null, null);
}
} catch (NumberFormatException e) {
// Not a valid field index, fall through to regular logic
}
}
}

return (schema.getType() == Schema.Type.RECORD) ? findCaseInsensitive(schema, fieldName) : null;
}

Expand All @@ -177,13 +272,19 @@ public Schema mapKeyPartner(Schema partner) {
@Override
public Schema mapValuePartner(Schema partner) {
Schema schema = SchemaUtilities.extractIfOption(partner);
return (schema.getType() == Schema.Type.MAP) ? schema.getValueType() : null;
if (schema.getType() != Schema.Type.MAP) {
return null;
}
return extractFromUnion(schema.getValueType());
}

@Override
public Schema listElementPartner(Schema partner) {
Schema schema = SchemaUtilities.extractIfOption(partner);
return (schema.getType() == Schema.Type.ARRAY) ? schema.getElementType() : null;
if (schema.getType() != Schema.Type.ARRAY) {
return null;
}
return extractFromUnion(schema.getElementType());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,176 @@ public void shouldHandleUnions() {
assertSchema(expected, merge(hive, avro));
}

@Test
public void shouldHandleSingleElementUnionsInArraysAndMaps() {
// This test verifies that single-element unions in array items and map values are properly unwrapped
// and the nested field nullability is preserved during schema merging.
// This reproduces the fix for handling avro.schema.literal with single-element unions like:
// - Array items: "items": [{"type":"record",...}]
// - Map values: "values": [{"type":"record",...}]
// These single-element unions appear in real-world Avro schemas stored as avro.schema.literal

String hive = "struct<id:bigint,items:array<struct<fooconfiguration:struct<name:string,urlvalue:string,source:string>,"
+ "barconfiguration:struct<name:string,domain:string>>>,"
+ "metadata:map<string,struct<category:string,priority:int>>>";

// Define an Avro schema literal similar to what would be stored in avro.schema.literal table property
// Note the single-element unions in array items and map values: [{"type":"record",...}]
String avroSchemaLiteral =
"{\"type\":\"record\",\"name\":\"test_complex_array_table\",\"namespace\":\"com.example.test\",\"fields\":["
+ "{\"name\":\"id\",\"type\":[\"null\",\"long\"],\"default\":null},"
+ "{\"name\":\"items\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"ItemConfig\",\"namespace\":\"com.example.data\",\"fields\":["
+ "{\"name\":\"fooConfiguration\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"FooConfiguration\",\"fields\":["
+ "{\"name\":\"name\",\"type\":\"string\"},"
+ "{\"name\":\"urlValue\",\"type\":\"string\"},"
+ "{\"name\":\"source\",\"type\":\"string\"}"
+ "]}],\"default\":null},"
+ "{\"name\":\"barConfiguration\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"BarConfiguration\",\"fields\":["
+ "{\"name\":\"name\",\"type\":\"string\"},"
+ "{\"name\":\"domain\",\"type\":\"string\"}"
+ "]}],\"default\":null}"
+ "]}]}],\"default\":null},"
+ "{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[{\"type\":\"record\",\"name\":\"MetadataValue\",\"namespace\":\"com.example.data\",\"fields\":["
+ "{\"name\":\"category\",\"type\":\"string\"},"
+ "{\"name\":\"priority\",\"type\":\"int\"}"
+ "]}]}],\"default\":null}"
+ "]}";

Schema avro = new Schema.Parser().parse(avroSchemaLiteral);
Schema merged = merge(hive, avro);

// Verify that single-element unions were properly handled
// Extract items array
Schema mergedItemsArray = SchemaUtilities.extractIfOption(merged.getField("items").schema());
Schema mergedItemConfig = mergedItemsArray.getElementType();

// The fix ensures that single-element union [ItemConfig] is unwrapped to ItemConfig
// Without the fix, this would fail because the union wouldn't be unwrapped
assertEquals(mergedItemConfig.getType(), Schema.Type.RECORD, "Array element should be a record, not a union");

// Extract fooConfiguration and verify nested field nullability is preserved
Schema mergedFooConfig =
SchemaUtilities.extractIfOption(mergedItemConfig.getField("fooConfiguration").schema());

// Nested fields should be non-nullable (required) as defined in the avro.schema.literal
assertEquals(mergedFooConfig.getField("name").schema().getType(), Schema.Type.STRING,
"name field should be non-nullable string");
assertEquals(mergedFooConfig.getField("urlValue").schema().getType(), Schema.Type.STRING,
"urlValue field should be non-nullable string");
assertEquals(mergedFooConfig.getField("source").schema().getType(), Schema.Type.STRING,
"source field should be non-nullable string");

// Verify barConfiguration nested fields
Schema mergedBarConfig =
SchemaUtilities.extractIfOption(mergedItemConfig.getField("barConfiguration").schema());
assertEquals(mergedBarConfig.getField("name").schema().getType(), Schema.Type.STRING,
"bar name field should be non-nullable string");
assertEquals(mergedBarConfig.getField("domain").schema().getType(), Schema.Type.STRING,
"domain field should be non-nullable string");

// Extract metadata map value and verify
// Ensures that single-element union [MetadataValue] is unwrapped to MetadataValue
Schema mergedMetadataMap = SchemaUtilities.extractIfOption(merged.getField("metadata").schema());
Schema mergedMetadataValue = mergedMetadataMap.getValueType();

assertEquals(mergedMetadataValue.getType(), Schema.Type.RECORD, "Map value should be a record, not a union");

// Fields in MetadataValue should be non-nullable as defined in avro.schema.literal
assertEquals(mergedMetadataValue.getField("category").schema().getType(), Schema.Type.STRING,
"category field should be non-nullable string");
assertEquals(mergedMetadataValue.getField("priority").schema().getType(), Schema.Type.INT,
"priority field should be non-nullable int");
}

@Test
public void shouldHandleUnionEncodedAsStruct() {
// This test verifies that when Hive has unions encoded as structs with tag/field0/field1/...
// and the Avro partner has the unions in their original form,
// the merged schema correctly preserves nullability from the Avro union branches.
// Tests both 2-way unions ["null", T] and 3-way unions ["null", T1, T2].
// Expected: tag should be non-nullable, array items should be non-nullable,
// and nested field nullability should be preserved from Avro.

String hive = "struct<"
+ "twowayunion:struct<tag:int,field0:array<struct<name:string,value:bigint>>>,"
+ "threewayunion:struct<tag:int,field0:bigint,field1:array<struct<description:string,metadata:string>>>"
+ ">";

// Avro partner schema with both 2-way and 3-way unions
String avroSchemaLiteral = "{\"type\":\"record\",\"name\":\"TestRecord\",\"namespace\":\"test\",\"fields\":["
// 2-way union: null or array
+ "{\"name\":\"twoWayUnion\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"TwoWayItem\",\"fields\":["
+ "{\"name\":\"name\",\"type\":\"string\"},"
+ "{\"name\":\"value\",\"type\":[\"null\",\"long\"]}"
+ "]}}]},"
// 3-way union: null, long, or array
+ "{\"name\":\"threeWayUnion\",\"type\":[\"null\",\"long\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"ThreeWayItem\",\"fields\":["
+ "{\"name\":\"description\",\"type\":\"string\"},"
+ "{\"name\":\"metadata\",\"type\":[\"null\",\"string\"]}"
+ "]}}]}"
+ "]}";

Schema avro = new Schema.Parser().parse(avroSchemaLiteral);
Schema merged = merge(hive, avro);

// ===== Test 2-way union =====
Schema.Field twoWayField = merged.getField("twoWayUnion");
assertNotNull(twoWayField, "twoWayUnion field should exist");
Schema twoWayStruct = twoWayField.schema();
assertEquals(twoWayStruct.getType(), Schema.Type.RECORD, "twoWayUnion should be a record (union encoded as struct)");

// Verify tag field is non-nullable int
Schema.Field twoWayTagField = twoWayStruct.getField("tag");
assertNotNull(twoWayTagField, "tag field should exist in twoWayUnion");
assertEquals(twoWayTagField.schema().getType(), Schema.Type.INT, "tag should be non-nullable int");

// Extract field0 (the array branch)
Schema.Field twoWayField0 = twoWayStruct.getField("field0");
assertNotNull(twoWayField0, "field0 should exist in twoWayUnion");
Schema twoWayArraySchema = SchemaUtilities.extractIfOption(twoWayField0.schema());
assertEquals(twoWayArraySchema.getType(), Schema.Type.ARRAY, "field0 should be an array");

// Verify array items are records (not ["null", record])
Schema twoWayItemSchema = twoWayArraySchema.getElementType();
assertEquals(twoWayItemSchema.getType(), Schema.Type.RECORD,
"Array items should be records, not unions with null");

// Verify nested field nullability: name is non-nullable, value is nullable
assertEquals(twoWayItemSchema.getField("name").schema().getType(), Schema.Type.STRING,
"name field should be non-nullable string");
assertTrue(AvroSerdeUtils.isNullableType(twoWayItemSchema.getField("value").schema()),
"value field should be nullable");

// ===== Test 3-way union =====
Schema.Field threeWayField = merged.getField("threeWayUnion");
assertNotNull(threeWayField, "threeWayUnion field should exist");
Schema threeWayStruct = threeWayField.schema();
assertEquals(threeWayStruct.getType(), Schema.Type.RECORD,
"threeWayUnion should be a record (union encoded as struct)");

// Verify tag field is non-nullable int
Schema.Field threeWayTagField = threeWayStruct.getField("tag");
assertNotNull(threeWayTagField, "tag field should exist in threeWayUnion");
assertEquals(threeWayTagField.schema().getType(), Schema.Type.INT, "tag should be non-nullable int");

// Extract field1 (the array branch - field0 is the long branch)
Schema.Field threeWayField1 = threeWayStruct.getField("field1");
assertNotNull(threeWayField1, "field1 should exist in threeWayUnion");
Schema threeWayArraySchema = SchemaUtilities.extractIfOption(threeWayField1.schema());
assertEquals(threeWayArraySchema.getType(), Schema.Type.ARRAY, "field1 should be an array");

// Verify array items are records (not ["null", record])
Schema threeWayItemSchema = threeWayArraySchema.getElementType();
assertEquals(threeWayItemSchema.getType(), Schema.Type.RECORD,
"Array items should be records, not unions with null");

// Verify nested field nullability: description is non-nullable, metadata is nullable
assertEquals(threeWayItemSchema.getField("description").schema().getType(), Schema.Type.STRING,
"description field should be non-nullable string");
assertTrue(AvroSerdeUtils.isNullableType(threeWayItemSchema.getField("metadata").schema()),
"metadata field should be nullable");
}

// TODO: tests to retain schema props
// TODO: tests for explicit type compatibility check between hive and avro primitives, once we implement it
// TODO: tests for error case => default value in Avro does not match with type from hive
Expand Down