Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2019-2023 LinkedIn Corporation. All rights reserved.
* Copyright 2019-2025 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD-2 Clause license.
* See LICENSE in the project root for license information.
*/
Expand Down Expand Up @@ -735,18 +735,124 @@ private static void appendFieldWithNewNamespace(@Nonnull Schema.Field field, @No
}

private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Nonnull String namespace) {
// Detect collisions and build mapping for suffix assignment
Map<String, List<String>> collisionMap = detectNamespaceCollisions(schema);
return setupNestedNamespaceForRecord(schema, namespace, collisionMap);
}

/**
* Detects which record names would collide (same name, different original namespace) at the current level.
* This includes records that appear directly in fields, within unions, arrays, or maps.
*
* @param schema The parent record schema to scan
* @return Map from record name -> ordered list of original namespaces (only for records with collisions)
*/
private static Map<String, List<String>> detectNamespaceCollisions(@Nonnull Schema schema) {
Map<String, List<String>> recordKeyToOriginalNamespaces = new LinkedHashMap<>();

// Scan all fields to collect record types with their parent paths
for (Schema.Field field : schema.getFields()) {
collectRecordTypes(field.schema(), "", recordKeyToOriginalNamespaces);
}

// Build collision map: record name -> list of original namespaces (only for actual collisions)
Map<String, List<String>> collisions = new LinkedHashMap<>();
for (Map.Entry<String, List<String>> entry : recordKeyToOriginalNamespaces.entrySet()) {
// Use a Set to check uniqueness while preserving order in List
Set<String> uniqueNamespaces = new LinkedHashSet<>(entry.getValue());
if (uniqueNamespaces.size() > 1) {
// Extract just the record name from the key (format: "parentPath::recordName")
String key = entry.getKey();
String recordName = key.substring(key.lastIndexOf("::") + 2);
collisions.put(recordName, entry.getValue());
}
}

return collisions;
}

/**
* Recursively collects all record types and their original namespaces from a schema.
* This traverses through unions, arrays, and maps to find all nested record types.
* Records are keyed by their parent path to ensure only records at the same hierarchical
* level are considered for collision detection.
*
* @param schema The schema to scan
* @param parentPath The hierarchical path to this schema element (e.g., "Parent.Child")
* @param recordKeyToNamespaces Map to populate with (parentPath::recordName) -> ordered list of original namespaces
*/
private static void collectRecordTypes(@Nonnull Schema schema, @Nonnull String parentPath,
@Nonnull Map<String, List<String>> recordKeyToNamespaces) {
switch (schema.getType()) {
case RECORD:
String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : "";
String recordName = schema.getName();
// Create a unique key combining parent path and record name
String key = parentPath + "::" + recordName;
recordKeyToNamespaces.computeIfAbsent(key, k -> new ArrayList<>()).add(originalNamespace);

// Recursively collect records from this record's fields
// Update parent path to include this record
String newParentPath = parentPath.isEmpty() ? recordName : parentPath + "." + recordName;
for (Schema.Field field : schema.getFields()) {
collectRecordTypes(field.schema(), newParentPath, recordKeyToNamespaces);
}
break;
case UNION:
for (Schema type : schema.getTypes()) {
collectRecordTypes(type, parentPath, recordKeyToNamespaces);
}
break;
case ARRAY:
collectRecordTypes(schema.getElementType(), parentPath, recordKeyToNamespaces);
break;
case MAP:
collectRecordTypes(schema.getValueType(), parentPath, recordKeyToNamespaces);
break;
case ENUM:
case BOOLEAN:
case BYTES:
case DOUBLE:
case FLOAT:
case INT:
case LONG:
case STRING:
case FIXED:
case NULL:
// These types don't contain nested records
break;
default:
break;
}
}

private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Nonnull String namespace,
@Nonnull Map<String, List<String>> collisionMap) {
Preconditions.checkNotNull(schema);
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(collisionMap);

if (!schema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException(
"Input schemas must be of RECORD type. " + "The actual type is: " + schema.getType());
}

// Add numeric suffix to avoid collisions when multiple fields have nested records with the same name
String recordNamespace = namespace;
if (collisionMap.containsKey(schema.getName())) {
List<String> namespaces = collisionMap.get(schema.getName());
String originalNamespace = schema.getNamespace() != null ? schema.getNamespace() : "";
int index = namespaces.indexOf(originalNamespace);
if (index >= 0) {
// Append numeric suffix based on order encountered: -0, -1, -2, etc.
recordNamespace = namespace + "-" + index;
}
}

SchemaBuilder.FieldAssembler<Schema> fieldAssembler =
SchemaBuilder.record(schema.getName()).namespace(namespace).fields();
SchemaBuilder.record(schema.getName()).namespace(recordNamespace).fields();

String nestedNamespace = namespace + "." + schema.getName();
String nestedNamespace = recordNamespace + "." + schema.getName();

for (Schema.Field field : schema.getFields()) {
switch (field.schema().getType()) {
Expand All @@ -765,7 +871,7 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
case MAP:
case UNION:
case ARRAY:
Schema newFieldSchema = setupNestedNamespace(field.schema(), nestedNamespace);
Schema newFieldSchema = setupNestedNamespace(field.schema(), nestedNamespace, collisionMap);
Schema.Field newField = AvroCompatibilityHelper.createSchemaField(field.name(), newFieldSchema, field.doc(),
defaultValue(field), field.order());
appendField(newField, fieldAssembler);
Expand All @@ -774,7 +880,8 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
appendFieldWithNewNamespace(field, nestedNamespace, fieldAssembler);
break;
case RECORD:
Schema recordSchemaWithNestedNamespace = setupNestedNamespaceForRecord(field.schema(), nestedNamespace);
Schema recordSchemaWithNestedNamespace =
setupNestedNamespaceForRecord(field.schema(), nestedNamespace, collisionMap);
Schema.Field newRecordFiled = AvroCompatibilityHelper.createSchemaField(field.name(),
recordSchemaWithNestedNamespace, field.doc(), defaultValue(field), field.order());
appendField(newRecordFiled, fieldAssembler);
Expand All @@ -787,9 +894,11 @@ private static Schema setupNestedNamespaceForRecord(@Nonnull Schema schema, @Non
return fieldAssembler.endRecord();
}

private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull String namespace) {
private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull String namespace,
@Nonnull Map<String, List<String>> collisionMap) {
Preconditions.checkNotNull(schema);
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(collisionMap);

switch (schema.getType()) {
case NULL:
Expand All @@ -805,21 +914,21 @@ private static Schema setupNestedNamespace(@Nonnull Schema schema, @Nonnull Stri
return schema;
case MAP:
Schema valueSchema = schema.getValueType();
Schema valueSchemaWithNestedNamespace = setupNestedNamespace(valueSchema, namespace);
Schema valueSchemaWithNestedNamespace = setupNestedNamespace(valueSchema, namespace, collisionMap);
return Schema.createMap(valueSchemaWithNestedNamespace);
case ARRAY:
Schema elementSchema = schema.getElementType();
Schema elementSchemaWithNestedNamespace = setupNestedNamespace(elementSchema, namespace);
Schema elementSchemaWithNestedNamespace = setupNestedNamespace(elementSchema, namespace, collisionMap);
return Schema.createArray(elementSchemaWithNestedNamespace);
case ENUM:
return Schema.createEnum(schema.getName(), schema.getDoc(), namespace, schema.getEnumSymbols());
case RECORD:
return setupNestedNamespaceForRecord(schema, namespace);
return setupNestedNamespaceForRecord(schema, namespace, collisionMap);
case UNION:
List<Schema> types = new ArrayList<>();

for (Schema type : schema.getTypes()) {
Schema typeWithNestNamespace = setupNestedNamespace(type, namespace);
Schema typeWithNestNamespace = setupNestedNamespace(type, namespace, collisionMap);
types.add(typeWithNestNamespace);
}
return Schema.createUnion(types);
Expand Down
Loading