Skip to content

Commit c508f72

Browse files
committed
do not add suffixes unnecessarily
1 parent df8a2ef commit c508f72

File tree

2 files changed

+46
-18
lines changed

2 files changed

+46
-18
lines changed

coral-schema/src/main/java/com/linkedin/coral/schema/avro/SchemaUtilities.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -748,20 +748,23 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
748748
* @return Map from record name -> ordered list of original namespaces (only for records with collisions)
749749
*/
750750
private static Map<String, List<String>> detectNamespaceCollisions(@Nonnull Schema schema) {
751-
Map<String, List<String>> recordNameToOriginalNamespaces = new LinkedHashMap<>();
751+
Map<String, List<String>> recordKeyToOriginalNamespaces = new LinkedHashMap<>();
752752

753-
// Scan all fields to collect record types and their original namespaces in order
753+
// Scan all fields to collect record types with their parent paths
754754
for (Schema.Field field : schema.getFields()) {
755-
collectRecordTypes(field.schema(), recordNameToOriginalNamespaces);
755+
collectRecordTypes(field.schema(), "", recordKeyToOriginalNamespaces);
756756
}
757757

758-
// Filter to only records that have collisions (multiple different original namespaces)
758+
// Build collision map: record name -> list of original namespaces (only for actual collisions)
759759
Map<String, List<String>> collisions = new LinkedHashMap<>();
760-
for (Map.Entry<String, List<String>> entry : recordNameToOriginalNamespaces.entrySet()) {
760+
for (Map.Entry<String, List<String>> entry : recordKeyToOriginalNamespaces.entrySet()) {
761761
// Use a Set to check uniqueness while preserving order in List
762762
Set<String> uniqueNamespaces = new LinkedHashSet<>(entry.getValue());
763763
if (uniqueNamespaces.size() > 1) {
764-
collisions.put(entry.getKey(), entry.getValue());
764+
// Extract just the record name from the key (format: "parentPath::recordName")
765+
String key = entry.getKey();
766+
String recordName = key.substring(key.lastIndexOf("::") + 2);
767+
collisions.put(recordName, entry.getValue());
765768
}
766769
}
767770

@@ -771,32 +774,40 @@ private static Map<String, List<String>> detectNamespaceCollisions(@Nonnull Sche
771774
/**
772775
* Recursively collects all record types and their original namespaces from a schema.
773776
* This traverses through unions, arrays, and maps to find all nested record types.
774-
* The order of collection matches the order records appear in the schema.
777+
* Records are keyed by their parent path to ensure only records at the same hierarchical
778+
* level are considered for collision detection.
775779
*
776780
* @param schema The schema to scan
777-
* @param recordNameToNamespaces Map to populate with record name -> ordered list of original namespaces
781+
* @param parentPath The hierarchical path to this schema element (e.g., "Parent.Child")
782+
* @param recordKeyToNamespaces Map to populate with (parentPath::recordName) -> ordered list of original namespaces
778783
*/
779-
private static void collectRecordTypes(@Nonnull Schema schema,
780-
@Nonnull Map<String, List<String>> recordNameToNamespaces) {
784+
private static void collectRecordTypes(@Nonnull Schema schema, @Nonnull String parentPath,
785+
@Nonnull Map<String, List<String>> recordKeyToNamespaces) {
781786
switch (schema.getType()) {
782787
case RECORD:
783788
String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : "";
784-
recordNameToNamespaces.computeIfAbsent(schema.getName(), k -> new ArrayList<>()).add(originalNamespace);
785-
// Recursively collect records from this record's fields to detect deeply nested collisions
789+
String recordName = schema.getName();
790+
// Create a unique key combining parent path and record name
791+
String key = parentPath + "::" + recordName;
792+
recordKeyToNamespaces.computeIfAbsent(key, k -> new ArrayList<>()).add(originalNamespace);
793+
794+
// Recursively collect records from this record's fields
795+
// Update parent path to include this record
796+
String newParentPath = parentPath.isEmpty() ? recordName : parentPath + "." + recordName;
786797
for (Schema.Field field : schema.getFields()) {
787-
collectRecordTypes(field.schema(), recordNameToNamespaces);
798+
collectRecordTypes(field.schema(), newParentPath, recordKeyToNamespaces);
788799
}
789800
break;
790801
case UNION:
791802
for (Schema type : schema.getTypes()) {
792-
collectRecordTypes(type, recordNameToNamespaces);
803+
collectRecordTypes(type, parentPath, recordKeyToNamespaces);
793804
}
794805
break;
795806
case ARRAY:
796-
collectRecordTypes(schema.getElementType(), recordNameToNamespaces);
807+
collectRecordTypes(schema.getElementType(), parentPath, recordKeyToNamespaces);
797808
break;
798809
case MAP:
799-
collectRecordTypes(schema.getValueType(), recordNameToNamespaces);
810+
collectRecordTypes(schema.getValueType(), parentPath, recordKeyToNamespaces);
800811
break;
801812
case ENUM:
802813
case BOOLEAN:

coral-schema/src/test/java/com/linkedin/coral/schema/avro/SchemaUtilitiesTests.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,17 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() {
214214
Schema collidingRecord2 = SchemaBuilder.record("CollidingRecord").namespace("com.bar.v2").fields().name("field2")
215215
.type().intType().noDefault().endRecord();
216216

217+
// Create a "Metadata" record that appears at a different hierarchical level
218+
// This should NOT get a suffix since it's not colliding with anything at its level
219+
Schema metadataRecord = SchemaBuilder.record("Metadata").namespace("com.original").fields().name("version")
220+
.type().stringType().noDefault().endRecord();
221+
217222
// Create an intermediate record that contains both colliding records
218223
// This represents the middle layer in the nesting hierarchy
219224
Schema intermediateRecord =
220225
SchemaBuilder.record("IntermediateRecord").namespace("com.intermediate").fields().name("collidingField1")
221-
.type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2).noDefault().endRecord();
226+
.type(collidingRecord1).noDefault().name("collidingField2").type(collidingRecord2).noDefault()
227+
.name("metadata").type(metadataRecord).noDefault().endRecord();
222228

223229
// Create top-level parent schema that contains the intermediate record
224230
Schema parentSchema = SchemaBuilder.record("ParentRecord").namespace("com.parent").fields()
@@ -233,14 +239,17 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() {
233239

234240
Schema.Field collidingField1 = intermediateSchema.getField("collidingField1");
235241
Schema.Field collidingField2 = intermediateSchema.getField("collidingField2");
242+
Schema.Field metadataField = intermediateSchema.getField("metadata");
236243

237244
Schema resultColliding1 = collidingField1.schema();
238245
Schema resultColliding2 = collidingField2.schema();
246+
Schema resultMetadata = metadataField.schema();
239247

240248
String namespace1 = resultColliding1.getNamespace();
241249
String namespace2 = resultColliding2.getNamespace();
250+
String metadataNamespace = resultMetadata.getNamespace();
242251

243-
// Both records have the same name
252+
// Both colliding records have the same name
244253
Assert.assertEquals(resultColliding1.getName(), "CollidingRecord");
245254
Assert.assertEquals(resultColliding2.getName(), "CollidingRecord");
246255

@@ -254,5 +263,13 @@ public void testSetupNameAndNamespaceDetectsDeeplyNestedCollisions() {
254263
"First colliding record namespace should have numeric suffix. Got: " + namespace1);
255264
Assert.assertTrue(namespace2.endsWith("-0") || namespace2.endsWith("-1"),
256265
"Second colliding record namespace should have numeric suffix. Got: " + namespace2);
266+
267+
// Verify that the non-colliding Metadata record does NOT have a numeric suffix
268+
Assert.assertEquals(resultMetadata.getName(), "Metadata");
269+
Assert.assertFalse(metadataNamespace.matches(".*-\\d+$"),
270+
"Metadata record should NOT have numeric suffix since it's not colliding at its level. Got: "
271+
+ metadataNamespace);
272+
Assert.assertTrue(metadataNamespace.contains("IntermediateRecord"),
273+
"Metadata namespace should follow hierarchical naming. Got: " + metadataNamespace);
257274
}
258275
}

0 commit comments

Comments
 (0)