Skip to content

Conversation

@nikagra
Copy link
Contributor

@nikagra nikagra commented Dec 9, 2025

Follow up to #21 which in turn is a continuation of #12

This pull request introduces support for collection types (LIST, SET, MAP) and User Defined Types (UDT) in the connector, both in the connector's logic and its configuration. It adds a new configuration option for how non-frozen collections are represented (currently only supporting "delta" mode), and updates the documentation to describe these changes. Additionally, the PR refactors schema and translation logic to handle the new types and cleans up configuration defaults.

Support for collections and UDTs:

  • Added support for both frozen and non-frozen collections (LIST, SET, MAP) and UDT columns, including their representation in Kafka Connect schemas and JSON output. The connector now translates these types from ScyllaDB to Kafka Connect records, handling complex nested structures. [1] [2] [3]
  • Introduced logic to handle non-frozen collections using a new "delta" mode, where changes to collections are represented as a struct with mode and elements fields, allowing for granular tracking of modifications, overwrites, and removals. [1] [2]

Connector configuration enhancements:

  • Added a new configuration option scylla.collections.mode (with enum type CollectionsMode), allowing users to specify how non-frozen collections are represented. The default is "delta" mode, and this is now parsed and exposed via the connector config. [1] [2] [3]
  • Cleaned up configuration defaults for several pooling and retry options, removing unnecessary .optional() calls and ensuring default values are always set. [1] [2] [3] [4] [5] [6] [7]

Documentation updates:

  • Updated the README.md to document the new support for collections and UDTs, including detailed descriptions of the JSON representation for both frozen and non-frozen collections, and the meaning of the new configuration option. [1] [2]

Codebase refactoring and type safety:

  • Refactored schema and translation logic to use the new Field class instead of Cell, and updated methods to support recursive translation of nested types. Improved type safety and code clarity in the process. [1] [2] [3] [4]

New enums for clarity:

  • Introduced new enums CollectionOperation and CollectionsMode to clearly represent collection change operations and configuration modes, improving code readability and maintainability. [1] [2]

Relevant issues:

@nikagra nikagra force-pushed the feat/non-frozen-collections branch from a4c715a to 1821b94 Compare December 9, 2025 17:20
@nikagra nikagra changed the title Feat/non frozen collections Support for collections 🗃️ Dec 9, 2025
@nikagra nikagra force-pushed the feat/non-frozen-collections branch from 3afb149 to d4fe276 Compare December 9, 2025 18:36
@nikagra nikagra requested a review from Copilot December 10, 2025 12:08
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds comprehensive support for Scylla collection types (LIST, SET, MAP, TUPLE, UDT) to the Debezium CDC connector. The implementation enables both frozen and non-frozen collection handling, with frozen collections being fully supported and non-frozen collections supported through a "delta" mode configuration option.

Key changes:

  • Extended schema computation to recursively handle collection types with proper Kafka Connect schema mappings
  • Implemented recursive field translation logic to convert Scylla collection data to Kafka format
  • Added configuration option for collection modes (currently only DELTA mode)

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
ScyllaSchema.java Adds recursive schema computation for collections, updates isSupportedColumnSchema to detect frozen vs non-frozen collections using CDC column patterns
ScyllaChangeRecordEmitter.java Refactors from Cell to Field API, implements recursive translateFieldToKafka for all collection types
ScyllaConnectorConfig.java Adds COLLECTIONS_MODE configuration field with DELTA as default value
CollectionsMode.java New enum defining collection handling modes (currently only DELTA)
README.md Documents collection support including frozen/non-frozen formats and delta mode behavior
Comments suppressed due to low confidence (1)

src/main/java/com/scylladb/cdc/debezium/connector/ScyllaChangeRecordEmitter.java:199

  • Variable keyStruct may be null at this access because of this null argument.
    Variable keyStruct may be null at this access because of this null argument.
        keyStruct.put(cdef.getColumnName(), value);

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@nikagra nikagra force-pushed the feat/non-frozen-collections branch 3 times, most recently from 9022a4b to a5aa1d3 Compare December 16, 2025 16:45
@nikagra nikagra force-pushed the feat/non-frozen-collections branch from a5aa1d3 to 58485b6 Compare December 17, 2025 20:01
@nikagra nikagra force-pushed the feat/non-frozen-collections branch 6 times, most recently from 15f0b41 to 6f80092 Compare December 22, 2025 19:28
@nikagra nikagra marked this pull request as ready for review December 22, 2025 19:45
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 9 comments.

Comment on lines +199 to +200
if (cdef.getBaseTableColumnKind() == ColumnKind.PARTITION_KEY
|| cdef.getBaseTableColumnKind() == ColumnKind.CLUSTERING_KEY) {
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent use of getBaseTableColumnKind() and ColumnKind here compared to getBaseTableColumnType() and ColumnType used throughout ScyllaSchema.java. For consistency and maintainability, the same API should be used across the codebase. Check if getBaseTableColumnKind() is the newer API and update ScyllaSchema.java accordingly, or revert to using getBaseTableColumnType() here.

Copilot uses AI. Check for mistakes.
}
keyStruct.put(cdef.getColumnName(), value);
} else {
Schema cellSchema = schema.cellSchema(cdef.getColumnName());
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException: schema.cellSchema(cdef.getColumnName()) can return null if the column is not found in the cellSchemas map. The code immediately attempts to access cellSchema.field(ScyllaSchema.CELL_VALUE) on line 216 without checking for null. Add a null check after line 208 or ensure that cellSchema will always be present for supported columns.

Suggested change
Schema cellSchema = schema.cellSchema(cdef.getColumnName());
Schema cellSchema = schema.cellSchema(cdef.getColumnName());
if (cellSchema == null) {
// No schema available for this column; skip to avoid NullPointerException.
continue;
}

Copilot uses AI. Check for mistakes.
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription(
"How to represent non-frozen collections. Currently, only 'delta' mode is supported - in the future support for more modes may be added. 'Delta' mode: change in collection is represented as a struct with 2 fields, 'mode' and 'elements'. 'mode' describes what type of change happened (modifying collection, overwriting collection), 'elements' contains added/removed elements.");
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description for scylla.collections.mode is quite lengthy for a configuration field. Consider breaking it into multiple sentences or shortening it for better readability. For example: "Specifies how non-frozen collections are represented. Currently only 'delta' mode is supported. In delta mode, collection changes are represented as a struct with 'mode' and 'elements' fields, where 'mode' indicates the type of change (modify or overwrite) and 'elements' contains the added or removed elements."

Suggested change
"How to represent non-frozen collections. Currently, only 'delta' mode is supported - in the future support for more modes may be added. 'Delta' mode: change in collection is represented as a struct with 2 fields, 'mode' and 'elements'. 'mode' describes what type of change happened (modifying collection, overwriting collection), 'elements' contains added/removed elements.");
"Specifies how non-frozen collections are represented. Currently, only 'delta' mode is supported. "
+ "In delta mode, collection changes are represented as a struct with 'mode' and 'elements' fields, "
+ "where 'mode' indicates the type of change (modify or overwrite) and 'elements' contains the added or removed elements.");

Copilot uses AI. Check for mistakes.
Comment on lines +129 to +135
/*
* Scylla CDC Source Connector relies on heartbeats to move the offset,
* because the offset determines if the generation ended, therefore HEARTBEAT_INTERVAL
* should be positive (0 would disable heartbeats) and a default value is changed
* (previously 0).
*/

Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment block about HEARTBEAT_INTERVAL appears to be misplaced. It's positioned between the COLLECTIONS_MODE field (lines 119-127) and the USER field (lines 136-142), but the actual CUSTOM_HEARTBEAT_INTERVAL field is defined much later at line 249. Either remove this duplicate comment (since the same comment appears at line 243-247 right before the field definition) or move it to immediately precede the CUSTOM_HEARTBEAT_INTERVAL field definition.

Suggested change
/*
* Scylla CDC Source Connector relies on heartbeats to move the offset,
* because the offset determines if the generation ended, therefore HEARTBEAT_INTERVAL
* should be positive (0 would disable heartbeats) and a default value is changed
* (previously 0).
*/

Copilot uses AI. Check for mistakes.
Comment on lines 301 to 304
deletedKeys.forEach(
(key) -> {
addedElements.put(key, null);
});
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lambda expression can be simplified. The explicit parameter parentheses and braces are unnecessary for a single-statement lambda with a single parameter. Consider simplifying to: deletedKeys.forEach(key -> addedElements.put(key, null));

Suggested change
deletedKeys.forEach(
(key) -> {
addedElements.put(key, null);
});
deletedKeys.forEach(key -> addedElements.put(key, null));

Copilot uses AI. Check for mistakes.
Comment on lines +330 to +333
Short index = -1;
for (Map.Entry<String, Field> element : elementsMap.entrySet()) {
index++;
if ((!element.getValue().isNull()) || deletedKeys.contains(index)) {
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialization of index to -1 followed by immediate pre-increment appears confusing. Consider initializing to 0 and using post-increment, or add a comment explaining why -1 is used (e.g., "Start at -1 because we increment before processing each element").

Suggested change
Short index = -1;
for (Map.Entry<String, Field> element : elementsMap.entrySet()) {
index++;
if ((!element.getValue().isNull()) || deletedKeys.contains(index)) {
Short index = 0;
for (Map.Entry<String, Field> element : elementsMap.entrySet()) {
if ((!element.getValue().isNull()) || deletedKeys.contains(index++)) {

Copilot uses AI. Check for mistakes.
README.md Outdated
Each non-frozen collection column is represented as a struct, with fields `mode` and `elements`. This struct will be stored in "Cell" described previously.
`mode` can be:
- `MODIFY` - elements were added or deleted.
- `OVERWRITE` - whole content of collection was removed, and new elements were added. If no elements were added (meaning the collection was just removed), this mode won't be used - instead, whole struct (stored in `field` value of "Cell" struct, as mentioned previously) will be null.
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: "field" should be "value" for consistency. The text reads "stored in field value of 'Cell' struct" but should be "stored in value field of 'Cell' struct" to match the actual field name and maintain consistency with other references.

Suggested change
- `OVERWRITE` - whole content of collection was removed, and new elements were added. If no elements were added (meaning the collection was just removed), this mode won't be used - instead, whole struct (stored in `field` value of "Cell" struct, as mentioned previously) will be null.
- `OVERWRITE` - whole content of collection was removed, and new elements were added. If no elements were added (meaning the collection was just removed), this mode won't be used - instead, whole struct (stored in `value` field of "Cell" struct, as mentioned previously) will be null.

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +68
* Parses a Debezium-style JSON envelope and extracts a collection of strings from the given field
* under the {@code after} section. Expects the structure:
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is misleading. It states "extracts a collection of strings" but the method is generic and uses a Function<JsonNode, T> mapper to extract elements of any type T, not just strings. Update the documentation to reflect this, for example: "Parses a Debezium-style JSON envelope and extracts a list of elements from the given field under the {@code after} section, applying the provided mapper function to each element."

Suggested change
* Parses a Debezium-style JSON envelope and extracts a collection of strings from the given field
* under the {@code after} section. Expects the structure:
* Parses a Debezium-style JSON envelope and extracts a list of elements from the given field
* under the {@code after} section, applying the provided mapper function to each element.
* Expects the structure:

Copilot uses AI. Check for mistakes.
if (keyStruct != null) {
keyStruct.put(cdef.getColumnName(), value);
}
keyStruct.put(cdef.getColumnName(), value);
Copy link

Copilot AI Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable keyStruct may be null at this access because of this null argument.
Variable keyStruct may be null at this access because of this null argument.

Suggested change
keyStruct.put(cdef.getColumnName(), value);
if (keyStruct != null) {
keyStruct.put(cdef.getColumnName(), value);
}

Copilot uses AI. Check for mistakes.
avelanarius and others added 8 commits December 29, 2025 18:45
Add support for including frozen lists in generated changes. Made
necessary changes to support nested data types.
Add support for including frozen sets in generated changes.
Add support for including frozen maps in generated changes.
Add support for including tuples in generated changes. For a tuple,
a Kafka Connect struct is created with "tuple_member_*" for each member
of a tuple (as they can have different data types inside).
Add support for including frozen UDTs in generated changes.
@nikagra nikagra force-pushed the feat/non-frozen-collections branch from 6f80092 to 428e5c7 Compare December 29, 2025 17:45
…ns and update integration tests for list, set, and map types
@nikagra nikagra force-pushed the feat/non-frozen-collections branch from 0606283 to 3949238 Compare December 30, 2025 17:57

protected static boolean isSupportedColumnSchema(ChangeSchema.ColumnDefinition cdef) {
protected static boolean isSupportedColumnSchema(
ChangeSchema changeSchema, ChangeSchema.ColumnDefinition cdef) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't seemse like changeSchema is used at all

package com.scylladb.cdc.debezium.connector;

public enum CollectionsMode {
DELTA,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs some explanation.
Also I feel like we should not have it, and instead have one configuration that does it for whole data.
For instance, we already have experimental.preimages.enabled that makes it create full data record if pre-mage is turned on, so probably we should follow that one, instead of having a sperate config.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants