Skip to content

Commit 053267c

Browse files
committed
only unwrap single element unions when it does not match the typeInfo
1 parent 71e9ac6 commit 053267c

File tree

4 files changed

+163
-98
lines changed

4 files changed

+163
-98
lines changed

coral-schema/src/main/java/com/linkedin/coral/schema/avro/HiveSchemaWithPartnerVisitor.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.util.List;
99

10+
import org.apache.avro.Schema;
1011
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
1112
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
1213
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
@@ -57,6 +58,12 @@ public interface PartnerAccessor<P, FP> {
5758
@SuppressWarnings("MethodTypeParameterName")
5859
public static <P, FP, R, FR> R visit(TypeInfo typeInfo, P partner, HiveSchemaWithPartnerVisitor<P, FP, R, FR> visitor,
5960
PartnerAccessor<P, FP> accessor) {
61+
62+
boolean partnerWrapped = shouldUnwrapPartner(partner, typeInfo);
63+
if (partnerWrapped) {
64+
partner = (P) SchemaUtilities.extractSingleElementUnion((Schema) partner);
65+
}
66+
R resultSchema;
6067
switch (typeInfo.getCategory()) {
6168
case STRUCT:
6269
StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
@@ -69,25 +76,29 @@ public static <P, FP, R, FR> R visit(TypeInfo typeInfo, P partner, HiveSchemaWit
6976
R result = visit(fieldTypeInfo, fieldPartnerType, visitor, accessor);
7077
results.add(visitor.field(name, fieldTypeInfo, fieldPartner, result));
7178
}
72-
return visitor.struct(structTypeInfo, partner, results);
79+
resultSchema = visitor.struct(structTypeInfo, partner, results);
80+
break;
7381

7482
case LIST:
7583
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
7684
TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo();
7785
P elementPartner = partner != null ? accessor.listElementPartner(partner) : null;
7886
R elementResult = visit(elementTypeInfo, elementPartner, visitor, accessor);
79-
return visitor.list(listTypeInfo, partner, elementResult);
87+
resultSchema = visitor.list(listTypeInfo, partner, elementResult);
88+
break;
8089

8190
case MAP:
8291
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
8392
P keyPartner = partner != null ? accessor.mapKeyPartner(partner) : null;
8493
R keyResult = visit(mapTypeInfo.getMapKeyTypeInfo(), keyPartner, visitor, accessor);
8594
P valuePartner = partner != null ? accessor.mapValuePartner(partner) : null;
8695
R valueResult = visit(mapTypeInfo.getMapValueTypeInfo(), valuePartner, visitor, accessor);
87-
return visitor.map(mapTypeInfo, partner, keyResult, valueResult);
96+
resultSchema = visitor.map(mapTypeInfo, partner, keyResult, valueResult);
97+
break;
8898

8999
case PRIMITIVE:
90-
return visitor.primitive((PrimitiveTypeInfo) typeInfo, partner);
100+
resultSchema = visitor.primitive((PrimitiveTypeInfo) typeInfo, partner);
101+
break;
91102

92103
case UNION:
93104
UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo;
@@ -98,11 +109,19 @@ public static <P, FP, R, FR> R visit(TypeInfo typeInfo, P partner, HiveSchemaWit
98109
R result = visit(allAlternatives.get(i), unionObjectPartner, visitor, accessor);
99110
unionResults.add(result);
100111
}
101-
return visitor.union(unionTypeInfo, partner, unionResults);
112+
resultSchema = visitor.union(unionTypeInfo, partner, unionResults);
113+
break;
102114

103115
default:
104116
throw new UnsupportedOperationException(typeInfo + " not supported");
105117
}
118+
119+
// Rewrap in single-element union if the partner was originally wrapped
120+
if (partnerWrapped) {
121+
resultSchema = (R) SchemaUtilities.wrapInSingleElementUnion((Schema) resultSchema);
122+
}
123+
124+
return resultSchema;
106125
}
107126

108127
public R struct(StructTypeInfo struct, P partner, List<FR> fieldResults) {
@@ -128,4 +147,19 @@ public R primitive(PrimitiveTypeInfo primitive, P partner) {
128147
public R union(UnionTypeInfo union, P partner, List<R> results) {
129148
return null;
130149
}
150+
151+
/**
152+
* Checks if a partner schema is a single-element union that needs unwrapping for matching.
153+
* Returns true when the Avro partner has a single-element union but the Hive type does not declare a union.
154+
*
155+
* @param partner The partner schema (may be a single-element union)
156+
* @param hiveTypeInfo The Hive type info for comparison
157+
* @param <P> The partner schema type
158+
* @return true if unwrapping (and later rewrapping) is needed, false otherwise
159+
*/
160+
private static <P> boolean shouldUnwrapPartner(P partner, TypeInfo hiveTypeInfo) {
161+
return partner != null && partner instanceof org.apache.avro.Schema
162+
&& SchemaUtilities.isSingleElementUnion((org.apache.avro.Schema) partner)
163+
&& !(hiveTypeInfo instanceof UnionTypeInfo);
164+
}
131165
}

coral-schema/src/main/java/com/linkedin/coral/schema/avro/MergeHiveSchemaWithAvro.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -157,27 +157,6 @@ private static class AvroPartnerAccessor implements PartnerAccessor<Schema, Sche
157157

158158
private static final Schema MAP_KEY = Schema.create(Schema.Type.STRING);
159159

160-
/**
161-
* Extracts the actual type from a schema that may be a union.
162-
* Handles two cases:
163-
* 1. Single-element unions like [{"type":"record",...}] - extracts the single type
164-
* 2. Nullable unions like ["null", "string"] - extracts the non-null type
165-
*
166-
* @param schema The schema to extract from
167-
* @return The extracted schema
168-
*/
169-
private static Schema extractFromUnion(Schema schema) {
170-
if (schema.getType() == Schema.Type.UNION) {
171-
List<Schema> types = schema.getTypes();
172-
if (types.size() == 1) {
173-
// Single-element union: just extract that single type
174-
return types.get(0);
175-
}
176-
}
177-
// Extract the non-null type from nullable unions like ["null", "string"]
178-
return SchemaUtilities.extractIfOption(schema);
179-
}
180-
181160
@Override
182161
public Schema.Field fieldPartner(Schema partner, String fieldName) {
183162
Schema schema = SchemaUtilities.extractIfOption(partner);
@@ -201,7 +180,7 @@ public Schema mapValuePartner(Schema partner) {
201180
if (schema.getType() != Schema.Type.MAP) {
202181
return null;
203182
}
204-
return extractFromUnion(schema.getValueType());
183+
return schema.getValueType();
205184
}
206185

207186
@Override
@@ -210,7 +189,7 @@ public Schema listElementPartner(Schema partner) {
210189
if (schema.getType() != Schema.Type.ARRAY) {
211190
return null;
212191
}
213-
return extractFromUnion(schema.getElementType());
192+
return schema.getElementType();
214193
}
215194

216195
@Override

coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,32 @@ static Schema extractIfOption(Schema schema) {
560560
}
561561
}
562562

563+
/**
564+
* Checks if a schema is a single-element union.
565+
* Single-element unions are unions with exactly one type, like [{"type":"record",...}]
566+
*
567+
* @param schema The schema to check
568+
* @return true if the schema is a single-element union, false otherwise
569+
*/
570+
static boolean isSingleElementUnion(Schema schema) {
571+
return schema != null && schema.getType() == Schema.Type.UNION && schema.getTypes().size() == 1;
572+
}
573+
574+
/**
575+
* Extracts the actual type from a single-element union.
576+
* Assumes the schema is a single-element union (caller should check with isSingleElementUnion first).
577+
*
578+
* @param schema The single-element union schema
579+
* @return The extracted schema (the single type from the union)
580+
*/
581+
static Schema extractSingleElementUnion(Schema schema) {
582+
return schema.getTypes().get(0);
583+
}
584+
585+
static Schema wrapInSingleElementUnion(Schema schema) {
586+
return Schema.createUnion(Arrays.asList(schema));
587+
}
588+
563589
private static Schema getUnionFieldSchema(@Nonnull Schema leftSchema, @Nonnull Schema rightSchema,
564590
boolean strictMode) {
565591
Preconditions.checkNotNull(leftSchema);

coral-schema/src/test/java/com/linkedin/coral/schema/avro/MergeHiveSchemaWithAvroTests.java

Lines changed: 96 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -241,84 +241,106 @@ public void shouldHandleUnions() {
241241
}
242242

243243
@Test
244-
public void shouldHandleSingleElementUnionsInArraysAndMaps() {
245-
// This test verifies that single-element unions in array items and map values are properly unwrapped
244+
public void shouldHandleSingleElementUnionsInAllTypes() {
245+
// This test verifies that single-element unions in primitives, array items, and map values are properly unwrapped
246246
// and the nested field nullability is preserved during schema merging.
247247
// This reproduces the fix for handling avro.schema.literal with single-element unions like:
248+
// - Primitives: "type": ["string"]
248249
// - Array items: "items": [{"type":"record",...}]
249250
// - Map values: "values": [{"type":"record",...}]
250251
// These single-element unions appear in real-world Avro schemas stored as avro.schema.literal
251252

252-
String hive = "struct<id:bigint,items:array<struct<fooconfiguration:struct<name:string,urlvalue:string,source:string>,"
253+
// seu = single-element union
254+
String hive = "struct<id:bigint,name:string,active:boolean,"
255+
+ "items:array<struct<fooconfiguration:struct<name:string,urlvalue:string,source:string>,"
253256
+ "barconfiguration:struct<name:string,domain:string>>>,"
254-
+ "metadata:map<string,struct<category:string,priority:int>>>";
255-
256-
// Define an Avro schema literal similar to what would be stored in avro.schema.literal table property
257-
// Note the single-element unions in array items and map values: [{"type":"record",...}]
258-
String avroSchemaLiteral =
259-
"{\"type\":\"record\",\"name\":\"test_complex_array_table\",\"namespace\":\"com.example.test\",\"fields\":["
260-
+ "{\"name\":\"id\",\"type\":[\"null\",\"long\"],\"default\":null},"
261-
+ "{\"name\":\"items\",\"type\":[\"null\",{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"ItemConfig\",\"namespace\":\"com.example.data\",\"fields\":["
262-
+ "{\"name\":\"fooConfiguration\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"FooConfiguration\",\"fields\":["
263-
+ "{\"name\":\"name\",\"type\":\"string\"},"
264-
+ "{\"name\":\"urlValue\",\"type\":\"string\"},"
265-
+ "{\"name\":\"source\",\"type\":\"string\"}"
266-
+ "]}],\"default\":null},"
267-
+ "{\"name\":\"barConfiguration\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"BarConfiguration\",\"fields\":["
268-
+ "{\"name\":\"name\",\"type\":\"string\"},"
269-
+ "{\"name\":\"domain\",\"type\":\"string\"}"
270-
+ "]}],\"default\":null}"
271-
+ "]}]}],\"default\":null},"
272-
+ "{\"name\":\"metadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":[{\"type\":\"record\",\"name\":\"MetadataValue\",\"namespace\":\"com.example.data\",\"fields\":["
273-
+ "{\"name\":\"category\",\"type\":\"string\"},"
274-
+ "{\"name\":\"priority\",\"type\":\"int\"}"
275-
+ "]}]}],\"default\":null}"
276-
+ "]}";
277-
278-
Schema avro = new Schema.Parser().parse(avroSchemaLiteral);
279-
Schema merged = merge(hive, avro);
257+
+ "metadata:map<string,struct<category:string,priority:int>>,"
258+
+ "tags:array<string>>";
259+
260+
// Nested record schemas
261+
Schema fooConfigSchema = struct("FooConfiguration", "doc-foo", "com.example.data",
262+
required("name", Schema.Type.STRING), required("urlValue", Schema.Type.STRING),
263+
required("source", Schema.Type.STRING));
264+
265+
Schema barConfigSchema = struct("BarConfiguration", "doc-bar", "com.example.data",
266+
required("name", Schema.Type.STRING), required("domain", Schema.Type.STRING));
267+
268+
Schema seu_arrayItemConfigSchema = struct("ItemConfig", "doc-item", "com.example.data",
269+
optional("fooConfiguration", fooConfigSchema), optional("barConfiguration", barConfigSchema));
270+
271+
Schema seu_mapValueMetadataSchema = struct("MetadataValue", "doc-metadata", "com.example.data",
272+
required("category", Schema.Type.STRING), required("priority", Schema.Type.INT));
273+
274+
// Construct Avro schema with single-element unions for primitives, array items, and map values
275+
Schema avro = struct("test_complex_array_table", "doc-test", "com.example.test", optional("id", Schema.Type.LONG),
276+
field("name", singleElementUnion(Schema.create(Schema.Type.STRING)), null, "unknown", null),
277+
field("active", singleElementUnion(Schema.create(Schema.Type.BOOLEAN)), null, false, null),
278+
optional("items", array(singleElementUnion(seu_arrayItemConfigSchema))),
279+
optional("metadata", map(singleElementUnion(seu_mapValueMetadataSchema))),
280+
optional("tags", array(singleElementUnion(Schema.create(Schema.Type.STRING)))));
281+
282+
// Expected schema after merge: single-element unions should be preserved
283+
// The structure of the Avro schema is maintained, including single-element unions
284+
// Expected output should match the input avro schema
285+
Schema expected = avro;
280286

281-
// Verify that single-element unions were properly handled
282-
// Extract items array
283-
Schema mergedItemsArray = SchemaUtilities.extractIfOption(merged.getField("items").schema());
284-
Schema mergedItemConfig = mergedItemsArray.getElementType();
285-
286-
// The fix ensures that single-element union [ItemConfig] is unwrapped to ItemConfig
287-
// Without the fix, this would fail because the union wouldn't be unwrapped
288-
assertEquals(mergedItemConfig.getType(), Schema.Type.RECORD, "Array element should be a record, not a union");
289-
290-
// Extract fooConfiguration and verify nested field nullability is preserved
291-
Schema mergedFooConfig =
292-
SchemaUtilities.extractIfOption(mergedItemConfig.getField("fooConfiguration").schema());
293-
294-
// Nested fields should be non-nullable (required) as defined in the avro.schema.literal
295-
assertEquals(mergedFooConfig.getField("name").schema().getType(), Schema.Type.STRING,
296-
"name field should be non-nullable string");
297-
assertEquals(mergedFooConfig.getField("urlValue").schema().getType(), Schema.Type.STRING,
298-
"urlValue field should be non-nullable string");
299-
assertEquals(mergedFooConfig.getField("source").schema().getType(), Schema.Type.STRING,
300-
"source field should be non-nullable string");
301-
302-
// Verify barConfiguration nested fields
303-
Schema mergedBarConfig =
304-
SchemaUtilities.extractIfOption(mergedItemConfig.getField("barConfiguration").schema());
305-
assertEquals(mergedBarConfig.getField("name").schema().getType(), Schema.Type.STRING,
306-
"bar name field should be non-nullable string");
307-
assertEquals(mergedBarConfig.getField("domain").schema().getType(), Schema.Type.STRING,
308-
"domain field should be non-nullable string");
309-
310-
// Extract metadata map value and verify
311-
// Ensures that single-element union [MetadataValue] is unwrapped to MetadataValue
312-
Schema mergedMetadataMap = SchemaUtilities.extractIfOption(merged.getField("metadata").schema());
313-
Schema mergedMetadataValue = mergedMetadataMap.getValueType();
314-
315-
assertEquals(mergedMetadataValue.getType(), Schema.Type.RECORD, "Map value should be a record, not a union");
316-
317-
// Fields in MetadataValue should be non-nullable as defined in avro.schema.literal
318-
assertEquals(mergedMetadataValue.getField("category").schema().getType(), Schema.Type.STRING,
319-
"category field should be non-nullable string");
320-
assertEquals(mergedMetadataValue.getField("priority").schema().getType(), Schema.Type.INT,
321-
"priority field should be non-nullable int");
287+
Schema actual = merge(hive, avro);
288+
289+
System.out.println("\n=== INPUT AVRO SCHEMA ===");
290+
System.out.println(avro.toString(true));
291+
System.out.println("\n=== EXPECTED OUTPUT SCHEMA ===");
292+
System.out.println(expected.toString(true));
293+
System.out.println("\n=== ACTUAL OUTPUT SCHEMA ===");
294+
System.out.println(actual.toString(true));
295+
System.out.println("\n=== END ===\n");
296+
297+
assertSchema(expected, actual);
298+
}
299+
300+
@Test
301+
public void shouldHandleSingleElementUnionsWithHiveUnionType() {
302+
// This test ensures backward compatibility with Hive union-encoded-as-struct format
303+
// when single-element unions are present in array items and map values.
304+
// In Hive, unions are represented as `uniontype<type1,type2>` and encoded as
305+
// struct<tag:int,field0:type1,field1:type2> in the schema.
306+
// This test verifies that extractIfOption correctly handles single-element unions
307+
// in nested structures even when the Hive schema uses uniontype format.
308+
309+
// seu = single-element union
310+
String hive = "struct<id:bigint,"
311+
+ "status:uniontype<string,int>,"
312+
+ "items:array<uniontype<struct<value:string>>>,"
313+
+ "metadata:map<string,uniontype<struct<priority:int>>>>";
314+
315+
Schema seu_arrayItemSchema =
316+
struct("Item", "doc-item", "com.example.data", required("value", Schema.Type.STRING));
317+
318+
Schema seu_mapValueMetadataSchema =
319+
struct("MetadataInfo", "doc-metadata", "com.example.data", required("priority", Schema.Type.INT));
320+
321+
// Construct Avro schema with single-element unions in array items and map values
322+
// Also includes a regular union for the status field (to test Hive uniontype compatibility)
323+
Schema avro = struct("test_union_compat", "doc-test", "com.example.test", optional("id", Schema.Type.LONG),
324+
required("status", union(Schema.Type.NULL, Schema.Type.STRING, Schema.Type.INT)),
325+
optional("items", array(singleElementUnion(seu_arrayItemSchema))),
326+
optional("metadata", map(singleElementUnion(seu_mapValueMetadataSchema))));
327+
328+
// Expected schema: single-element unions should be preserved, regular union preserved
329+
// The Avro schema structure is maintained
330+
// Expected output should match the input avro schema
331+
Schema expected = avro;
332+
333+
Schema actual = merge(hive, avro);
334+
335+
System.out.println("\n=== INPUT AVRO SCHEMA (HiveUnionType test) ===");
336+
System.out.println(avro.toString(true));
337+
System.out.println("\n=== EXPECTED OUTPUT SCHEMA (HiveUnionType test) ===");
338+
System.out.println(expected.toString(true));
339+
System.out.println("\n=== ACTUAL OUTPUT SCHEMA (HiveUnionType test) ===");
340+
System.out.println(actual.toString(true));
341+
System.out.println("\n=== END ===\n");
342+
343+
assertSchema(expected, actual);
322344
}
323345

324346
// TODO: tests to retain schema props
@@ -331,6 +353,10 @@ private Schema union(Schema.Type... types) {
331353
return Schema.createUnion(Arrays.stream(types).map(Schema::create).collect(Collectors.toList()));
332354
}
333355

356+
private Schema singleElementUnion(Schema schema) {
357+
return Schema.createUnion(Arrays.asList(schema));
358+
}
359+
334360
private void assertSchema(Schema expected, Schema actual) {
335361
assertEquals(actual.toString(true), expected.toString(true));
336362
}

0 commit comments

Comments
 (0)