Skip to content

Commit d069a1f

Browse files
committed
fix to allow for reconciling hive-style unions with avro literal schema union
1 parent d274580 commit d069a1f

File tree

1 file changed

+78
-4
lines changed

1 file changed

+78
-4
lines changed

coral-schema/src/main/java/com/linkedin/coral/schema/avro/MergeHiveSchemaWithAvro.java

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ static Schema visit(StructTypeInfo typeInfo, Schema schema) {
4949

5050
@Override
5151
public Schema struct(StructTypeInfo struct, Schema partner, List<Schema.Field> fieldResults) {
52+
// Check if this is a union-encoded-as-struct by looking for "tag" field in fieldResults
53+
boolean isUnionEncodedAsStruct = fieldResults.stream()
54+
.anyMatch(f -> f.name().equalsIgnoreCase("tag"));
55+
5256
boolean shouldResultBeOptional = partner == null || isNullableType(partner);
5357
Schema result;
5458
if (partner == null || SchemaUtilities.extractIfOption(partner).getType() != Schema.Type.RECORD) {
@@ -61,18 +65,57 @@ public Schema struct(StructTypeInfo struct, Schema partner, List<Schema.Field> f
6165
}
6266
// While calling `makeNullable`, we should respect the option order of `partner`
6367
// i.e. if the schema of `partner` is [int, null], the resultant schema should also be [int, null] rather than [null, int]
64-
return shouldResultBeOptional ? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
68+
// However, if this is a union-encoded-as-struct, don't make it nullable - it represents the union structure itself
69+
return (shouldResultBeOptional && !isUnionEncodedAsStruct)
70+
? SchemaUtilities.makeNullable(result, SchemaUtilities.isNullSecond(partner))
6571
: result;
6672
}
6773

74+
/**
75+
* Helper method to get an appropriate default value for a schema type.
76+
* Used for union-encoded-as-struct fields that need non-null defaults.
77+
*
78+
* @param schema The schema to get a default value for
79+
* @return An appropriate default value, or null if none can be determined
80+
*/
81+
private Object getDefaultValueForSchema(Schema schema) {
82+
switch (schema.getType()) {
83+
case INT:
84+
return 0;
85+
case LONG:
86+
return 0L;
87+
case FLOAT:
88+
return 0.0f;
89+
case DOUBLE:
90+
return 0.0;
91+
case BOOLEAN:
92+
return false;
93+
case STRING:
94+
return "";
95+
default:
96+
return null;
97+
}
98+
}
99+
68100
@Override
69101
public Schema.Field field(String name, TypeInfo field, Schema.Field partner, Schema fieldResult) {
70102
// No need to infer `shouldResultBeOptional`. We expect other visitor methods to return optional schemas
71103
// in their field results if required
72104
if (partner == null) {
73-
// if there was no matching Avro field, use name form the Hive schema and set a null default
74-
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
75-
null);
105+
// if there was no matching Avro field, use name from the Hive schema
106+
// Special case: for union-encoded-as-struct fields (tag, field0, field1, ...), keep them non-nullable
107+
// even though they don't have Avro partners, because they're part of the union encoding structure
108+
boolean isUnionEncodingField = name.equals("tag") || name.matches("field\\d+");
109+
if (isUnionEncodingField && !AvroSerdeUtils.isNullableType(fieldResult)) {
110+
// For union encoding fields, use a non-null default appropriate for the type
111+
Object defaultValue = getDefaultValueForSchema(fieldResult);
112+
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
113+
defaultValue);
114+
} else {
115+
// For regular fields not found in Avro, make them optional with null default
116+
return AvroCompatibilityHelper.createSchemaField(SchemaUtilities.makeCompatibleName(name), fieldResult, null,
117+
null);
118+
}
76119
} else {
77120
// TODO: How to ensure that field default value is compatible with new field type generated from Hive?
78121
// Copy field type from the visitor result, copy everything else from the partner
@@ -181,6 +224,37 @@ private static Schema extractFromUnion(Schema schema) {
181224
@Override
182225
public Schema.Field fieldPartner(Schema partner, String fieldName) {
183226
Schema schema = SchemaUtilities.extractIfOption(partner);
227+
228+
// Special case: If the partner is a union and the field name matches the union-encoded-as-struct pattern,
229+
// this is a Hive union-encoded-as-struct. We need to return a synthetic field that wraps the corresponding union branch.
230+
// Check the ORIGINAL partner, not the extracted schema, because extractIfOption may unwrap 2-way unions
231+
if (partner.getType() == Schema.Type.UNION) {
232+
// Handle "tag" field - this is the discriminator for the union, should be non-nullable int
233+
if (fieldName.equalsIgnoreCase("tag")) {
234+
// Return a synthetic non-nullable int field for the tag
235+
// This ensures primitive() gets a non-null partner and doesn't make it nullable
236+
Schema intSchema = Schema.create(Schema.Type.INT);
237+
return AvroCompatibilityHelper.createSchemaField(fieldName, intSchema, "Union tag discriminator", 0);
238+
}
239+
240+
// Handle "fieldN" fields - these correspond to union branches
241+
if (fieldName.matches("field\\d+")) {
242+
try {
243+
int fieldIndex = Integer.parseInt(fieldName.substring(5)); // Extract number from "fieldN"
244+
// Use the original partner (which is the union), not the extracted schema
245+
Schema unionWithoutNull = SchemaUtilities.discardNullFromUnionIfExist(partner);
246+
List<Schema> branches = unionWithoutNull.getTypes();
247+
if (fieldIndex < branches.size()) {
248+
// Create a synthetic field that wraps the union branch schema
249+
Schema branchSchema = branches.get(fieldIndex);
250+
return AvroCompatibilityHelper.createSchemaField(fieldName, branchSchema, null, null);
251+
}
252+
} catch (NumberFormatException e) {
253+
// Not a valid field index, fall through to regular logic
254+
}
255+
}
256+
}
257+
184258
return (schema.getType() == Schema.Type.RECORD) ? findCaseInsensitive(schema, fieldName) : null;
185259
}
186260

0 commit comments

Comments
 (0)