@@ -748,20 +748,23 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
748748 * @return Map from record name -> ordered list of original namespaces (only for records with collisions)
749749 */
750750 private static Map <String , List <String >> detectNamespaceCollisions (@ Nonnull Schema schema ) {
751- Map <String , List <String >> recordNameToOriginalNamespaces = new LinkedHashMap <>();
751+ Map <String , List <String >> recordKeyToOriginalNamespaces = new LinkedHashMap <>();
752752
753- // Scan all fields to collect record types and their original namespaces in order
753+ // Scan all fields to collect record types with their parent paths
754754 for (Schema .Field field : schema .getFields ()) {
755- collectRecordTypes (field .schema (), recordNameToOriginalNamespaces );
755+ collectRecordTypes (field .schema (), "" , recordKeyToOriginalNamespaces );
756756 }
757757
758- // Filter to only records that have collisions (multiple different original namespaces)
758+ // Build collision map: record name -> list of original namespaces (only for actual collisions )
759759 Map <String , List <String >> collisions = new LinkedHashMap <>();
760- for (Map .Entry <String , List <String >> entry : recordNameToOriginalNamespaces .entrySet ()) {
760+ for (Map .Entry <String , List <String >> entry : recordKeyToOriginalNamespaces .entrySet ()) {
761761 // Use a Set to check uniqueness while preserving order in List
762762 Set <String > uniqueNamespaces = new LinkedHashSet <>(entry .getValue ());
763763 if (uniqueNamespaces .size () > 1 ) {
764- collisions .put (entry .getKey (), entry .getValue ());
764+ // Extract just the record name from the key (format: "parentPath::recordName")
765+ String key = entry .getKey ();
766+ String recordName = key .substring (key .lastIndexOf ("::" ) + 2 );
767+ collisions .put (recordName , entry .getValue ());
765768 }
766769 }
767770
@@ -771,32 +774,40 @@ private static Map<String, List<String>> detectNamespaceCollisions(@Nonnull Sche
771774 /**
772775 * Recursively collects all record types and their original namespaces from a schema.
773776 * This traverses through unions, arrays, and maps to find all nested record types.
774- * The order of collection matches the order records appear in the schema.
777+ * Records are keyed by their parent path to ensure only records at the same hierarchical
778+ * level are considered for collision detection.
775779 *
776780 * @param schema The schema to scan
777- * @param recordNameToNamespaces Map to populate with record name -> ordered list of original namespaces
781+ * @param parentPath The hierarchical path to this schema element (e.g., "Parent.Child")
782+ * @param recordKeyToNamespaces Map to populate with (parentPath::recordName) -> ordered list of original namespaces
778783 */
779- private static void collectRecordTypes (@ Nonnull Schema schema ,
780- @ Nonnull Map <String , List <String >> recordNameToNamespaces ) {
784+ private static void collectRecordTypes (@ Nonnull Schema schema , @ Nonnull String parentPath ,
785+ @ Nonnull Map <String , List <String >> recordKeyToNamespaces ) {
781786 switch (schema .getType ()) {
782787 case RECORD :
783788 String originalNamespace = schema .getNamespace () != null ? schema .getNamespace () : "" ;
784- recordNameToNamespaces .computeIfAbsent (schema .getName (), k -> new ArrayList <>()).add (originalNamespace );
785- // Recursively collect records from this record's fields to detect deeply nested collisions
789+ String recordName = schema .getName ();
790+ // Create a unique key combining parent path and record name
791+ String key = parentPath + "::" + recordName ;
792+ recordKeyToNamespaces .computeIfAbsent (key , k -> new ArrayList <>()).add (originalNamespace );
793+
794+ // Recursively collect records from this record's fields
795+ // Update parent path to include this record
796+ String newParentPath = parentPath .isEmpty () ? recordName : parentPath + "." + recordName ;
786797 for (Schema .Field field : schema .getFields ()) {
787- collectRecordTypes (field .schema (), recordNameToNamespaces );
798+ collectRecordTypes (field .schema (), newParentPath , recordKeyToNamespaces );
788799 }
789800 break ;
790801 case UNION :
791802 for (Schema type : schema .getTypes ()) {
792- collectRecordTypes (type , recordNameToNamespaces );
803+ collectRecordTypes (type , parentPath , recordKeyToNamespaces );
793804 }
794805 break ;
795806 case ARRAY :
796- collectRecordTypes (schema .getElementType (), recordNameToNamespaces );
807+ collectRecordTypes (schema .getElementType (), parentPath , recordKeyToNamespaces );
797808 break ;
798809 case MAP :
799- collectRecordTypes (schema .getValueType (), recordNameToNamespaces );
810+ collectRecordTypes (schema .getValueType (), parentPath , recordKeyToNamespaces );
800811 break ;
801812 case ENUM :
802813 case BOOLEAN :
0 commit comments