@@ -735,18 +735,109 @@ private static void appendFieldWithNewNamespace(@Nonnull Schema.Field field, @No
735735 }
736736
737737 private static Schema setupNestedNamespaceForRecord (@ Nonnull Schema schema , @ Nonnull String namespace ) {
738+ // Detect collisions and build mapping for suffix assignment
739+ Map <String , List <String >> collisionMap = detectNamespaceCollisions (schema );
740+ return setupNestedNamespaceForRecord (schema , namespace , collisionMap );
741+ }
742+
743+ /**
744+ * Detects which record names would collide (same name, different original namespace) at the current level.
745+ * This includes records that appear directly in fields, within unions, arrays, or maps.
746+ *
747+ * @param schema The parent record schema to scan
748+ * @return Map from record name -> ordered list of original namespaces (only for records with collisions)
749+ */
750+ private static Map <String , List <String >> detectNamespaceCollisions (@ Nonnull Schema schema ) {
751+ Map <String , List <String >> recordNameToOriginalNamespaces = new LinkedHashMap <>();
752+
753+ // Scan all fields to collect record types and their original namespaces in order
754+ for (Schema .Field field : schema .getFields ()) {
755+ collectRecordTypes (field .schema (), recordNameToOriginalNamespaces );
756+ }
757+
758+ // Filter to only records that have collisions (multiple different original namespaces)
759+ Map <String , List <String >> collisions = new LinkedHashMap <>();
760+ for (Map .Entry <String , List <String >> entry : recordNameToOriginalNamespaces .entrySet ()) {
761+ // Use a Set to check uniqueness while preserving order in List
762+ Set <String > uniqueNamespaces = new LinkedHashSet <>(entry .getValue ());
763+ if (uniqueNamespaces .size () > 1 ) {
764+ collisions .put (entry .getKey (), entry .getValue ());
765+ }
766+ }
767+
768+ return collisions ;
769+ }
770+
771+ /**
772+ * Recursively collects all record types and their original namespaces from a schema.
773+ * This traverses through unions, arrays, and maps to find all nested record types.
774+ * The order of collection matches the order records appear in the schema.
775+ *
776+ * @param schema The schema to scan
777+ * @param recordNameToNamespaces Map to populate with record name -> ordered list of original namespaces
778+ */
779+ private static void collectRecordTypes (@ Nonnull Schema schema ,
780+ @ Nonnull Map <String , List <String >> recordNameToNamespaces ) {
781+ switch (schema .getType ()) {
782+ case RECORD :
783+ String originalNamespace = schema .getNamespace () != null ? schema .getNamespace () : "" ;
784+ recordNameToNamespaces .computeIfAbsent (schema .getName (), k -> new ArrayList <>()).add (originalNamespace );
785+ break ;
786+ case UNION :
787+ for (Schema type : schema .getTypes ()) {
788+ collectRecordTypes (type , recordNameToNamespaces );
789+ }
790+ break ;
791+ case ARRAY :
792+ collectRecordTypes (schema .getElementType (), recordNameToNamespaces );
793+ break ;
794+ case MAP :
795+ collectRecordTypes (schema .getValueType (), recordNameToNamespaces );
796+ break ;
797+ case ENUM :
798+ case BOOLEAN :
799+ case BYTES :
800+ case DOUBLE :
801+ case FLOAT :
802+ case INT :
803+ case LONG :
804+ case STRING :
805+ case FIXED :
806+ case NULL :
807+ // These types don't contain nested records
808+ break ;
809+ default :
810+ break ;
811+ }
812+ }
813+
814+ private static Schema setupNestedNamespaceForRecord (@ Nonnull Schema schema , @ Nonnull String namespace ,
815+ @ Nonnull Map <String , List <String >> collisionMap ) {
738816 Preconditions .checkNotNull (schema );
739817 Preconditions .checkNotNull (namespace );
818+ Preconditions .checkNotNull (collisionMap );
740819
741820 if (!schema .getType ().equals (Schema .Type .RECORD )) {
742821 throw new IllegalArgumentException (
743822 "Input schemas must be of RECORD type. " + "The actual type is: " + schema .getType ());
744823 }
745824
825+ // Add numeric suffix to avoid collisions when multiple fields have nested records with the same name
826+ String recordNamespace = namespace ;
827+ if (collisionMap .containsKey (schema .getName ())) {
828+ List <String > namespaces = collisionMap .get (schema .getName ());
829+ String originalNamespace = schema .getNamespace () != null ? schema .getNamespace () : "" ;
830+ int index = namespaces .indexOf (originalNamespace );
831+ if (index >= 0 ) {
832+ // Append numeric suffix based on order encountered: -0, -1, -2, etc.
833+ recordNamespace = namespace + "-" + index ;
834+ }
835+ }
836+
746837 SchemaBuilder .FieldAssembler <Schema > fieldAssembler =
747- SchemaBuilder .record (schema .getName ()).namespace (namespace ).fields ();
838+ SchemaBuilder .record (schema .getName ()).namespace (recordNamespace ).fields ();
748839
749- String nestedNamespace = namespace + "." + schema .getName ();
840+ String nestedNamespace = recordNamespace + "." + schema .getName ();
750841
751842 for (Schema .Field field : schema .getFields ()) {
752843 switch (field .schema ().getType ()) {
@@ -765,7 +856,7 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
765856 case MAP :
766857 case UNION :
767858 case ARRAY :
768- Schema newFieldSchema = setupNestedNamespace (field .schema (), nestedNamespace );
859+ Schema newFieldSchema = setupNestedNamespace (field .schema (), nestedNamespace , collisionMap );
769860 Schema .Field newField = AvroCompatibilityHelper .createSchemaField (field .name (), newFieldSchema , field .doc (),
770861 defaultValue (field ), field .order ());
771862 appendField (newField , fieldAssembler );
@@ -774,7 +865,8 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
774865 appendFieldWithNewNamespace (field , nestedNamespace , fieldAssembler );
775866 break ;
776867 case RECORD :
777- Schema recordSchemaWithNestedNamespace = setupNestedNamespaceForRecord (field .schema (), nestedNamespace );
868+ Schema recordSchemaWithNestedNamespace =
869+ setupNestedNamespaceForRecord (field .schema (), nestedNamespace , collisionMap );
778870 Schema .Field newRecordFiled = AvroCompatibilityHelper .createSchemaField (field .name (),
779871 recordSchemaWithNestedNamespace , field .doc (), defaultValue (field ), field .order ());
780872 appendField (newRecordFiled , fieldAssembler );
@@ -787,9 +879,11 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
787879 return fieldAssembler .endRecord ();
788880 }
789881
790- private static Schema setupNestedNamespace (@ Nonnull Schema schema , @ Nonnull String namespace ) {
882+ private static Schema setupNestedNamespace (@ Nonnull Schema schema , @ Nonnull String namespace ,
883+ @ Nonnull Map <String , List <String >> collisionMap ) {
791884 Preconditions .checkNotNull (schema );
792885 Preconditions .checkNotNull (namespace );
886+ Preconditions .checkNotNull (collisionMap );
793887
794888 switch (schema .getType ()) {
795889 case NULL :
@@ -805,21 +899,21 @@ private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull Stri
805899 return schema ;
806900 case MAP :
807901 Schema valueSchema = schema .getValueType ();
808- Schema valueSchemaWithNestedNamespace = setupNestedNamespace (valueSchema , namespace );
902+ Schema valueSchemaWithNestedNamespace = setupNestedNamespace (valueSchema , namespace , collisionMap );
809903 return Schema .createMap (valueSchemaWithNestedNamespace );
810904 case ARRAY :
811905 Schema elementSchema = schema .getElementType ();
812- Schema elementSchemaWithNestedNamespace = setupNestedNamespace (elementSchema , namespace );
906+ Schema elementSchemaWithNestedNamespace = setupNestedNamespace (elementSchema , namespace , collisionMap );
813907 return Schema .createArray (elementSchemaWithNestedNamespace );
814908 case ENUM :
815909 return Schema .createEnum (schema .getName (), schema .getDoc (), namespace , schema .getEnumSymbols ());
816910 case RECORD :
817- return setupNestedNamespaceForRecord (schema , namespace );
911+ return setupNestedNamespaceForRecord (schema , namespace , collisionMap );
818912 case UNION :
819913 List <Schema > types = new ArrayList <>();
820914
821915 for (Schema type : schema .getTypes ()) {
822- Schema typeWithNestNamespace = setupNestedNamespace (type , namespace );
916+ Schema typeWithNestNamespace = setupNestedNamespace (type , namespace , collisionMap );
823917 types .add (typeWithNestNamespace );
824918 }
825919 return Schema .createUnion (types );
0 commit comments