From 4ff767c563394d9dda3a909d3083bb1b0675645d Mon Sep 17 00:00:00 2001 From: Ruolin Fan Date: Thu, 6 Nov 2025 15:43:16 -0800 Subject: [PATCH 1/4] fix namespace collisions --- .../coral/schema/avro/SchemaUtilities.java | 112 ++++++++++++++++-- .../schema/avro/SchemaUtilitiesTests.java | 104 ++++++++++++++++ 2 files changed, 207 insertions(+), 9 deletions(-) diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index cde0a992d..9ce36e18e 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -735,18 +735,109 @@ private static void appendFieldWithNewNamespace(@Nonnull Schema.Field field, @No } private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Nonnull String namespace) { + // Detect collisions and build mapping for suffix assignment + Map> collisionMap = detectNamespaceCollisions(schema); + return setupNestedNamespaceForRecord(schema, namespace, collisionMap); + } + + /** + * Detects which record names would collide (same name, different original namespace) at the current level. + * This includes records that appear directly in fields, within unions, arrays, or maps. + * + * @param schema The parent record schema to scan + * @return Map from record name -> ordered list of original namespaces (only for records with collisions) + */ + private static Map> detectNamespaceCollisions(@Nonnull Schema schema) { + Map> recordNameToOriginalNamespaces = new LinkedHashMap<>(); + + // Scan all fields to collect record types and their original namespaces in order + for (Schema.Field field : schema.getFields()) { + collectRecordTypes(field.schema(), recordNameToOriginalNamespaces); + } + + // Filter to only records that have collisions (multiple different original namespaces) + Map> collisions = new LinkedHashMap<>(); + for (Map.Entry> entry : recordNameToOriginalNamespaces.entrySet()) { + // Use a Set to check uniqueness while preserving order in List + Set uniqueNamespaces = new LinkedHashSet<>(entry.getValue()); + if (uniqueNamespaces.size() > 1) { + collisions.put(entry.getKey(), entry.getValue()); + } + } + + return collisions; + } + + /** + * Recursively collects all record types and their original namespaces from a schema. + * This traverses through unions, arrays, and maps to find all nested record types. + * The order of collection matches the order records appear in the schema. + * + * @param schema The schema to scan + * @param recordNameToNamespaces Map to populate with record name -> ordered list of original namespaces + */ + private static void collectRecordTypes(@Nonnull Schema schema, + @Nonnull Map> recordNameToNamespaces) { + switch (schema.getType()) { + case RECORD: + String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : ""; + recordNameToNamespaces.computeIfAbsent(schema.getName(), k -> new ArrayList<>()).add(originalNamespace); + break; + case UNION: + for (Schema type : schema.getTypes()) { + collectRecordTypes(type, recordNameToNamespaces); + } + break; + case ARRAY: + collectRecordTypes(schema.getElementType(), recordNameToNamespaces); + break; + case MAP: + collectRecordTypes(schema.getValueType(), recordNameToNamespaces); + break; + case ENUM: + case BOOLEAN: + case BYTES: + case DOUBLE: + case FLOAT: + case INT: + case LONG: + case STRING: + case FIXED: + case NULL: + // These types don't contain nested records + break; + default: + break; + } + } + + private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Nonnull String namespace, + @Nonnull Map> collisionMap) { Preconditions.checkNotNull(schema); Preconditions.checkNotNull(namespace); + Preconditions.checkNotNull(collisionMap); if (!schema.getType().equals(Schema.Type.RECORD)) { throw new IllegalArgumentException( "Input schemas must be of RECORD type. " + "The actual type is: " + schema.getType()); } + // Add numeric suffix to avoid collisions when multiple fields have nested records with the same name + String recordNamespace = namespace; + if (collisionMap.containsKey(schema.getName())) { + List namespaces = collisionMap.get(schema.getName()); + String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : ""; + int index = namespaces.indexOf(originalNamespace); + if (index >= 0) { + // Append numeric suffix based on order encountered: -0, -1, -2, etc. + recordNamespace = namespace + "-" + index; + } + } + SchemaBuilder.FieldAssembler fieldAssembler = - SchemaBuilder.record(schema.getName()).namespace(namespace).fields(); + SchemaBuilder.record(schema.getName()).namespace(recordNamespace).fields(); - String nestedNamespace = namespace + "." + schema.getName(); + String nestedNamespace = recordNamespace + "." + schema.getName(); for (Schema.Field field : schema.getFields()) { switch (field.schema().getType()) { @@ -765,7 +856,7 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non case MAP: case UNION: case ARRAY: - Schema newFieldSchema = setupNestedNamespace(field.schema(), nestedNamespace); + Schema newFieldSchema = setupNestedNamespace(field.schema(), nestedNamespace, collisionMap); Schema.Field newField = AvroCompatibilityHelper.createSchemaField(field.name(), newFieldSchema, field.doc(), defaultValue(field), field.order()); appendField(newField, fieldAssembler); @@ -774,7 +865,8 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non appendFieldWithNewNamespace(field, nestedNamespace, fieldAssembler); break; case RECORD: - Schema recordSchemaWithNestedNamespace = setupNestedNamespaceForRecord(field.schema(), nestedNamespace); + Schema recordSchemaWithNestedNamespace = + setupNestedNamespaceForRecord(field.schema(), nestedNamespace, collisionMap); Schema.Field newRecordFiled = AvroCompatibilityHelper.createSchemaField(field.name(), recordSchemaWithNestedNamespace, field.doc(), defaultValue(field), field.order()); appendField(newRecordFiled, fieldAssembler); @@ -787,9 +879,11 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non return fieldAssembler.endRecord(); } - private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull String namespace) { + private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull String namespace, + @Nonnull Map> collisionMap) { Preconditions.checkNotNull(schema); Preconditions.checkNotNull(namespace); + Preconditions.checkNotNull(collisionMap); switch (schema.getType()) { case NULL: @@ -805,21 +899,21 @@ private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull Stri return schema; case MAP: Schema valueSchema = schema.getValueType(); - Schema valueSchemaWithNestedNamespace = setupNestedNamespace(valueSchema, namespace); + Schema valueSchemaWithNestedNamespace = setupNestedNamespace(valueSchema, namespace, collisionMap); return Schema.createMap(valueSchemaWithNestedNamespace); case ARRAY: Schema elementSchema = schema.getElementType(); - Schema elementSchemaWithNestedNamespace = setupNestedNamespace(elementSchema, namespace); + Schema elementSchemaWithNestedNamespace = setupNestedNamespace(elementSchema, namespace, collisionMap); return Schema.createArray(elementSchemaWithNestedNamespace); case ENUM: return Schema.createEnum(schema.getName(), schema.getDoc(), namespace, schema.getEnumSymbols()); case RECORD: - return setupNestedNamespaceForRecord(schema, namespace); + return setupNestedNamespaceForRecord(schema, namespace, collisionMap); case UNION: List types = new ArrayList<>(); for (Schema type : schema.getTypes()) { - Schema typeWithNestNamespace = setupNestedNamespace(type, namespace); + Schema typeWithNestNamespace = setupNestedNamespace(type, namespace, collisionMap); types.add(typeWithNestNamespace); } return Schema.createUnion(types); diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index ba655c95a..dfe87a97c 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -90,4 +90,108 @@ public void testToNullableSchema() { Assert.assertEquals(outputSchema.toString(true), TestUtils.loadSchema("testToNullableSchema-expected.avsc")); } + + /** + * Test to verify that setupNameAndNamespace preserves original namespaces for nested records with the same name. + * This prevents namespace collisions when two fields have nested records with the same name but different original namespaces. + */ + @Test + public void testSetupNameAndNamespacePreservesOriginalNamespace() { + // Create first nested record with namespace "com.foo.bar" + Schema nestedRecord1 = SchemaBuilder.record("FooRecord").namespace("com.foo.bar").fields().name("field1") + .type().intType().noDefault().endRecord(); + + // Create second nested record with the same name but different namespace "com.baz.qux" + Schema nestedRecord2 = SchemaBuilder.record("FooRecord").namespace("com.baz.qux").fields().name("field2") + .type().stringType().noDefault().endRecord(); + + // Create nullable unions for both nested records + Schema nullableRecord1 = Schema.createUnion(Schema.create(Schema.Type.NULL), nestedRecord1); + Schema nullableRecord2 = Schema.createUnion(Schema.create(Schema.Type.NULL), nestedRecord2); + + // Create parent schema with two fields containing the nested records + Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields().name("contextV1") + .type(nullableRecord1).noDefault().name("contextV2").type(nullableRecord2).noDefault().endRecord(); + + // Apply setupNameAndNamespace + Schema resultSchema = SchemaUtilities.setupNameAndNamespace(parentSchema, "ParentRecord", "com.parent.test"); + + // Extract the nested record schemas from the result + Schema.Field contextV1Field = resultSchema.getField("contextV1"); + Schema.Field contextV2Field = resultSchema.getField("contextV2"); + + // Get the non-null type from the union + Schema resultRecord1 = contextV1Field.schema().getTypes().get(1); + Schema resultRecord2 = contextV2Field.schema().getTypes().get(1); + + // Verify that both records still have different namespaces (preserving original namespace info) + // The new namespace should incorporate the original namespace to avoid conflicts + String namespace1 = resultRecord1.getNamespace(); + String namespace2 = resultRecord2.getNamespace(); + + // Both records have the same name + Assert.assertEquals(resultRecord1.getName(), "FooRecord"); + Assert.assertEquals(resultRecord2.getName(), "FooRecord"); + + // But they should have different namespaces with numeric suffixes to avoid conflicts + Assert.assertNotEquals(namespace1, namespace2, + "Namespaces should be different to avoid conflicts. Got namespace1: " + namespace1 + ", namespace2: " + + namespace2); + + // Verify that numeric suffixes are appended to distinguish the colliding records + Assert.assertTrue(namespace1.endsWith("-0") || namespace1.endsWith("-1"), + "First record namespace should have numeric suffix. Got: " + namespace1); + Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"), + "Second record namespace should have numeric suffix. Got: " + namespace2); + } + + /** + * Test to verify that collision detection works for direct nested records (not in unions). + * When two fields have direct nested records with the same name but different original namespaces, + * the system should detect the collision and preserve the original namespaces. + */ + @Test + public void testSetupNameAndNamespaceDetectsDirectRecordCollisions() { + // Create first nested record with namespace ending in "admin" + Schema nestedRecord1 = SchemaBuilder.record("ConfigRecord").namespace("com.foo.admin").fields().name("setting1") + .type().intType().noDefault().endRecord(); + + // Create second nested record with the same name but namespace ending in "client" + Schema nestedRecord2 = SchemaBuilder.record("ConfigRecord").namespace("com.bar.client").fields().name("setting2") + .type().stringType().noDefault().endRecord(); + + // Create parent schema with two fields containing the nested records directly (NOT in unions) + Schema parentSchema = SchemaBuilder.record("ApplicationConfig").namespace("com.app").fields().name("serviceConfig1") + .type(nestedRecord1).noDefault().name("serviceConfig2").type(nestedRecord2).noDefault().endRecord(); + + // Apply setupNameAndNamespace + Schema resultSchema = + SchemaUtilities.setupNameAndNamespace(parentSchema, "ApplicationConfig", "com.app.config"); + + // Extract the nested record schemas from the result + Schema.Field config1Field = resultSchema.getField("serviceConfig1"); + Schema.Field config2Field = resultSchema.getField("serviceConfig2"); + + Schema resultRecord1 = config1Field.schema(); + Schema resultRecord2 = config2Field.schema(); + + // Verify that both records still have different namespaces (collision was detected and handled) + String namespace1 = resultRecord1.getNamespace(); + String namespace2 = resultRecord2.getNamespace(); + + // Both records have the same name + Assert.assertEquals(resultRecord1.getName(), "ConfigRecord"); + Assert.assertEquals(resultRecord2.getName(), "ConfigRecord"); + + // But they should have different namespaces with numeric suffixes because collision was detected + Assert.assertNotEquals(namespace1, namespace2, + "Namespaces should be different when collision is detected. Got namespace1: " + namespace1 + ", namespace2: " + + namespace2); + + // Verify that numeric suffixes are appended to distinguish the colliding records + Assert.assertTrue(namespace1.endsWith("-0") || namespace1.endsWith("-1"), + "First record namespace should have numeric suffix. Got: " + namespace1); + Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"), + "Second record namespace should have numeric suffix. Got: " + namespace2); + } } From ff4886d42b061b41dca63f18834311c4a474ead9 Mon Sep 17 00:00:00 2001 From: Ruolin Fan Date: Thu, 13 Nov 2025 10:32:08 -0800 Subject: [PATCH 2/4] also detect deeply nested collisions in records --- .../coral/schema/avro/SchemaUtilities.java | 4 ++ .../schema/avro/SchemaUtilitiesTests.java | 63 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index 9ce36e18e..280642f4d 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -782,6 +782,10 @@ private static void collectRecordTypes(@Nonnull Schema schema, case RECORD: String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : ""; recordNameToNamespaces.computeIfAbsent(schema.getName(), k -> new ArrayList<>()).add(originalNamespace); + // Recursively collect records from this record's fields to detect deeply nested collisions + for (Schema.Field field : schema.getFields()) { + collectRecordTypes(field.schema(), recordNameToNamespaces); + } break; case UNION: for (Schema type : schema.getTypes()) { diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index dfe87a97c..afadb78f8 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -194,4 +194,67 @@ public void testSetupNameAndNamespaceDetectsDirectRecordCollisions() { Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"), "Second record namespace should have numeric suffix. Got: " + namespace2); } + + /** + * Test to verify that collision detection works for deeply nested records. + * This reproduces the real-world scenario where a record with the same name appears twice with different namespaces, + * but both are nested inside an intermediate record, which is itself nested in the parent. + * + * Schema structure: + * ParentRecord (top-level) + * └─ intermediateField (IntermediateRecord - contains the colliding records) + * ├─ collidingField1 (CollidingRecord from com.foo.v1 namespace) + * └─ collidingField2 (CollidingRecord from com.bar.v2 namespace) + */ + @Test + public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { + // Create two "CollidingRecord" records with the same name but different namespaces + // These represent the deeply nested records that will collide + Schema collidingRecord1 = SchemaBuilder.record("CollidingRecord").namespace("com.foo.v1").fields().name("field1") + .type().stringType().noDefault().endRecord(); + + Schema collidingRecord2 = SchemaBuilder.record("CollidingRecord").namespace("com.bar.v2").fields().name("field2") + .type().intType().noDefault().endRecord(); + + // Create an intermediate record that contains both colliding records + // This represents the middle layer in the nesting hierarchy + Schema intermediateRecord = SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields() + .name("collidingField1").type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2) + .noDefault().endRecord(); + + // Create top-level parent schema that contains the intermediate record + Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields().name("intermediateField") + .type(intermediateRecord).noDefault().endRecord(); + + // Apply setupNameAndNamespace + Schema resultSchema = SchemaUtilities.setupNameAndNamespace(parentSchema, "ParentRecord", "com.result"); + + // Navigate to the deeply nested colliding records + Schema.Field intermediateField = resultSchema.getField("intermediateField"); + Schema intermediateSchema = intermediateField.schema(); + + Schema.Field collidingField1 = intermediateSchema.getField("collidingField1"); + Schema.Field collidingField2 = intermediateSchema.getField("collidingField2"); + + Schema resultColliding1 = collidingField1.schema(); + Schema resultColliding2 = collidingField2.schema(); + + String namespace1 = resultColliding1.getNamespace(); + String namespace2 = resultColliding2.getNamespace(); + + // Both records have the same name + Assert.assertEquals(resultColliding1.getName(), "CollidingRecord"); + Assert.assertEquals(resultColliding2.getName(), "CollidingRecord"); + + // But they should have different namespaces with numeric suffixes because collision was detected + Assert.assertNotEquals(namespace1, namespace2, + "Namespaces should be different when collision is detected in deeply nested records. Got namespace1: " + + namespace1 + ", namespace2: " + namespace2); + + // Verify that numeric suffixes are appended to distinguish the colliding records + Assert.assertTrue(namespace1.endsWith("-0") || namespace1.endsWith("-1"), + "First colliding record namespace should have numeric suffix. Got: " + namespace1); + Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"), + "Second colliding record namespace should have numeric suffix. Got: " + namespace2); + } } From 97515dda4431d44791e15f612ab41b65d3fd198d Mon Sep 17 00:00:00 2001 From: Ruolin Fan Date: Fri, 14 Nov 2025 15:09:21 -0800 Subject: [PATCH 3/4] Apply Spotless formatting --- .../coral/schema/avro/SchemaUtilities.java | 2 +- .../schema/avro/SchemaUtilitiesTests.java | 28 +++++++++---------- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index 280642f4d..ce3c5176f 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -1,5 +1,5 @@ /** - * Copyright 2019-2023 LinkedIn Corporation. All rights reserved. + * Copyright 2019-2025 LinkedIn Corporation. All rights reserved. * Licensed under the BSD-2 Clause license. * See LICENSE in the project root for license information. */ diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index afadb78f8..70c608505 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -1,5 +1,5 @@ /** - * Copyright 2019-2023 LinkedIn Corporation. All rights reserved. + * Copyright 2019-2025 LinkedIn Corporation. All rights reserved. * Licensed under the BSD-2 Clause license. * See LICENSE in the project root for license information. */ @@ -98,12 +98,12 @@ public void testToNullableSchema() { @Test public void testSetupNameAndNamespacePreservesOriginalNamespace() { // Create first nested record with namespace "com.foo.bar" - Schema nestedRecord1 = SchemaBuilder.record("FooRecord").namespace("com.foo.bar").fields().name("field1") - .type().intType().noDefault().endRecord(); + Schema nestedRecord1 = SchemaBuilder.record("FooRecord").namespace("com.foo.bar").fields().name("field1").type() + .intType().noDefault().endRecord(); // Create second nested record with the same name but different namespace "com.baz.qux" - Schema nestedRecord2 = SchemaBuilder.record("FooRecord").namespace("com.baz.qux").fields().name("field2") - .type().stringType().noDefault().endRecord(); + Schema nestedRecord2 = SchemaBuilder.record("FooRecord").namespace("com.baz.qux").fields().name("field2").type() + .stringType().noDefault().endRecord(); // Create nullable unions for both nested records Schema nullableRecord1 = Schema.createUnion(Schema.create(Schema.Type.NULL), nestedRecord1); @@ -134,9 +134,8 @@ public void testSetupNameAndNamespacePreservesOriginalNamespace() { Assert.assertEquals(resultRecord2.getName(), "FooRecord"); // But they should have different namespaces with numeric suffixes to avoid conflicts - Assert.assertNotEquals(namespace1, namespace2, - "Namespaces should be different to avoid conflicts. Got namespace1: " + namespace1 + ", namespace2: " - + namespace2); + Assert.assertNotEquals(namespace1, namespace2, "Namespaces should be different to avoid conflicts. Got namespace1: " + + namespace1 + ", namespace2: " + namespace2); // Verify that numeric suffixes are appended to distinguish the colliding records Assert.assertTrue(namespace1.endsWith("-0") || namespace1.endsWith("-1"), @@ -165,8 +164,7 @@ public void testSetupNameAndNamespaceDetectsDirectRecordCollisions() { .type(nestedRecord1).noDefault().name("serviceConfig2").type(nestedRecord2).noDefault().endRecord(); // Apply setupNameAndNamespace - Schema resultSchema = - SchemaUtilities.setupNameAndNamespace(parentSchema, "ApplicationConfig", "com.app.config"); + Schema resultSchema = SchemaUtilities.setupNameAndNamespace(parentSchema, "ApplicationConfig", "com.app.config"); // Extract the nested record schemas from the result Schema.Field config1Field = resultSchema.getField("serviceConfig1"); @@ -218,13 +216,13 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { // Create an intermediate record that contains both colliding records // This represents the middle layer in the nesting hierarchy - Schema intermediateRecord = SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields() - .name("collidingField1").type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2) - .noDefault().endRecord(); + Schema intermediateRecord = + SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields().name("collidingField1") + .type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2).noDefault().endRecord(); // Create top-level parent schema that contains the intermediate record - Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields().name("intermediateField") - .type(intermediateRecord).noDefault().endRecord(); + Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields() + .name("intermediateField").type(intermediateRecord).noDefault().endRecord(); // Apply setupNameAndNamespace Schema resultSchema = SchemaUtilities.setupNameAndNamespace(parentSchema, "ParentRecord", "com.result"); From b5cb80312476b4b3d45169cecca589de181c0a1f Mon Sep 17 00:00:00 2001 From: Ruolin Fan Date: Fri, 14 Nov 2025 17:18:25 -0800 Subject: [PATCH 4/4] do not add suffixes unnecessarily --- .../coral/schema/avro/SchemaUtilities.java | 43 ++++++++++++------- .../schema/avro/SchemaUtilitiesTests.java | 24 +++++++++-- 2 files changed, 47 insertions(+), 20 deletions(-) diff --git a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java index ce3c5176f..63d0576d8 100644 --- a/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java +++ b/coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java @@ -748,20 +748,23 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non * @return Map from record name -> ordered list of original namespaces (only for records with collisions) */ private static Map> detectNamespaceCollisions(@Nonnull Schema schema) { - Map> recordNameToOriginalNamespaces = new LinkedHashMap<>(); + Map> recordKeyToOriginalNamespaces = new LinkedHashMap<>(); - // Scan all fields to collect record types and their original namespaces in order + // Scan all fields to collect record types with their parent paths for (Schema.Field field : schema.getFields()) { - collectRecordTypes(field.schema(), recordNameToOriginalNamespaces); + collectRecordTypes(field.schema(), "", recordKeyToOriginalNamespaces); } - // Filter to only records that have collisions (multiple different original namespaces) + // Build collision map: record name -> list of original namespaces (only for actual collisions) Map> collisions = new LinkedHashMap<>(); - for (Map.Entry> entry : recordNameToOriginalNamespaces.entrySet()) { + for (Map.Entry> entry : recordKeyToOriginalNamespaces.entrySet()) { // Use a Set to check uniqueness while preserving order in List Set uniqueNamespaces = new LinkedHashSet<>(entry.getValue()); if (uniqueNamespaces.size() > 1) { - collisions.put(entry.getKey(), entry.getValue()); + // Extract just the record name from the key (format: "parentPath::recordName") + String key = entry.getKey(); + String recordName = key.substring(key.lastIndexOf("::") + 2); + collisions.put(recordName, entry.getValue()); } } @@ -771,32 +774,40 @@ private static Map> detectNamespaceCollisions(@Nonnull Sche /** * Recursively collects all record types and their original namespaces from a schema. * This traverses through unions, arrays, and maps to find all nested record types. - * The order of collection matches the order records appear in the schema. + * Records are keyed by their parent path to ensure only records at the same hierarchical + * level are considered for collision detection. * * @param schema The schema to scan - * @param recordNameToNamespaces Map to populate with record name -> ordered list of original namespaces + * @param parentPath The hierarchical path to this schema element (e.g., "Parent.Child") + * @param recordKeyToNamespaces Map to populate with (parentPath::recordName) -> ordered list of original namespaces */ - private static void collectRecordTypes(@Nonnull Schema schema, - @Nonnull Map> recordNameToNamespaces) { + private static void collectRecordTypes(@Nonnull Schema schema, @Nonnull String parentPath, + @Nonnull Map> recordKeyToNamespaces) { switch (schema.getType()) { case RECORD: String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : ""; - recordNameToNamespaces.computeIfAbsent(schema.getName(), k -> new ArrayList<>()).add(originalNamespace); - // Recursively collect records from this record's fields to detect deeply nested collisions + String recordName = schema.getName(); + // Create a unique key combining parent path and record name + String key = parentPath + "::" + recordName; + recordKeyToNamespaces.computeIfAbsent(key, k -> new ArrayList<>()).add(originalNamespace); + + // Recursively collect records from this record's fields + // Update parent path to include this record + String newParentPath = parentPath.isEmpty() ? recordName : parentPath + "." + recordName; for (Schema.Field field : schema.getFields()) { - collectRecordTypes(field.schema(), recordNameToNamespaces); + collectRecordTypes(field.schema(), newParentPath, recordKeyToNamespaces); } break; case UNION: for (Schema type : schema.getTypes()) { - collectRecordTypes(type, recordNameToNamespaces); + collectRecordTypes(type, parentPath, recordKeyToNamespaces); } break; case ARRAY: - collectRecordTypes(schema.getElementType(), recordNameToNamespaces); + collectRecordTypes(schema.getElementType(), parentPath, recordKeyToNamespaces); break; case MAP: - collectRecordTypes(schema.getValueType(), recordNameToNamespaces); + collectRecordTypes(schema.getValueType(), parentPath, recordKeyToNamespaces); break; case ENUM: case BOOLEAN: diff --git a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java index 70c608505..b731af594 100644 --- a/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java +++ b/coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java @@ -214,11 +214,16 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Schema collidingRecord2 = SchemaBuilder.record("CollidingRecord").namespace("com.bar.v2").fields().name("field2") .type().intType().noDefault().endRecord(); + // Create a "Metadata" record that appears at a different hierarchical level + // This should NOT get a suffix since it's not colliding with anything at its level + Schema metadataRecord = SchemaBuilder.record("Metadata").namespace("com.original").fields().name("version").type() + .stringType().noDefault().endRecord(); + // Create an intermediate record that contains both colliding records // This represents the middle layer in the nesting hierarchy - Schema intermediateRecord = - SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields().name("collidingField1") - .type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2).noDefault().endRecord(); + Schema intermediateRecord = SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields() + .name("collidingField1").type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2) + .noDefault().name("metadata").type(metadataRecord).noDefault().endRecord(); // Create top-level parent schema that contains the intermediate record Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields() @@ -233,14 +238,17 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { Schema.Field collidingField1 = intermediateSchema.getField("collidingField1"); Schema.Field collidingField2 = intermediateSchema.getField("collidingField2"); + Schema.Field metadataField = intermediateSchema.getField("metadata"); Schema resultColliding1 = collidingField1.schema(); Schema resultColliding2 = collidingField2.schema(); + Schema resultMetadata = metadataField.schema(); String namespace1 = resultColliding1.getNamespace(); String namespace2 = resultColliding2.getNamespace(); + String metadataNamespace = resultMetadata.getNamespace(); - // Both records have the same name + // Both colliding records have the same name Assert.assertEquals(resultColliding1.getName(), "CollidingRecord"); Assert.assertEquals(resultColliding2.getName(), "CollidingRecord"); @@ -254,5 +262,13 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() { "First colliding record namespace should have numeric suffix. Got: " + namespace1); Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"), "Second colliding record namespace should have numeric suffix. Got: " + namespace2); + + // Verify that the non-colliding Metadata record does NOT have a numeric suffix + Assert.assertEquals(resultMetadata.getName(), "Metadata"); + Assert.assertFalse(metadataNamespace.matches(".*-\\d+$"), + "Metadata record should NOT have numeric suffix since it's not colliding at its level. Got: " + + metadataNamespace); + Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"), + "Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace); } }