Skip to content

Add CacheEnabled and LoggingEnabled properties to Kafka Streams binder for enhanced KTable control #3094

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
gihong2012 opened this issue Mar 7, 2025 · 5 comments

Comments

@gihong2012
Copy link

Describe the issue
Currently, I need to disable caching for a KTable, but the Kafka Streams binder in Spring Cloud Stream does not provide this functionality natively. To address this, I propose adding a CacheEnabled property to allow enabling/disabling caching for KTables, similar to how materialized-as provides control over KTable materialization. For consistency and flexibility, I also suggest adding a LoggingEnabled property to toggle detailed logging, offering the same level of configurability.

These properties would enhance control over KTable behavior, addressing limitations in performance optimization (via caching) and debugging (via logging).

To Reproduce
This is a feature request, so reproduction steps are not applicable. However, the limitation can be observed when attempting to disable caching for a KTable in the current Kafka Streams binder, which lacks a direct configuration option for this.

Version of the framework - currently I use
spring-cloud-stream-binder-kafka-streams: 4.1.3
kafka-streams: 3.7.1
Expected behavior
CacheEnabled:
true: Enables caching for the KTable (default behavior).
false: Disables caching, allowing finer performance tuning.
LoggingEnabled:
true: create changelog topic.
false: not create changelog topic.

These properties would mirror the flexibility provided by existing options like materialized-as, extending it to caching and logging control.

Additional context
Adding these properties would benefit applications requiring fine-grained control over KTable behavior, such as:

Disabling caching to reduce memory usage or latency in specific use cases.
Enabling detailed logging for troubleshooting complex stream processing pipelines.
What are your thoughts on adding these properties to the Kafka Streams binder?
or I really need another way that add stream processor, right now functional is only way to create stream processor.

@sobychacko
Copy link
Contributor

sobychacko commented Mar 26, 2025

@gihong2012 That sounds like a good new feature. If you are up to it, you are more than welcomed to work on adding this feature and send a PR. In that case, let us come up with a design first. Why don't you start with some ideas, and then we can iterate on it? What are your thoughts?

@gihong-park
Copy link

gihong-park commented Mar 27, 2025

Sorry for the change of account; it's due to a personal reason.

Now that I've re-read my previous comment on this issue, I realize some points might have been unclear because English isn't my first language. I'd like to state my points more clearly now.

Using the standard Kafka Streams API (without Spring), you can directly control state store caching and logging when materializing a KTable, like this:

// Example using plain Kafka Streams API
streamsBuilder.<String, String>table(
    "example_topic",
    Materialized.<String, String, KeyValueStore<Bytes, ByteArray>>as("example_table")
        .withCachingDisabled() // Directly controls caching
        .withLoggingDisabled() // Directly controls changelog topic creation
);
// You can use .withCachingEnabled() or .withLoggingEnabled() as needed.

However, when using Spring Cloud Stream with the functional programming model (e.g., defining Function<KTable, KStream>, Consumer, etc.), there appears to be no straightforward way to apply these specific configurations (withCachingDisabled/Enabled, withLoggingDisabled/Enabled) directly to KTables or GlobalKTables that is created by inside spring-cloud-stream.

My objective is to propose a way to address this limitation. I suggest introducing configuration properties at the binder level to control these behaviors for state stores materialized within the functional context. For example:

  • A property (perhaps named something like spring.cloud.stream.kafka.streams.bindings.${FUNCTION_NAME}-in-x.consumer.caching-enabled) could control whether caching is enabled or disabled, corresponding to the withCachingEnabled()/Disabled() methods.

  • Similarly, another property (e.g., spring.cloud.stream.kafka.streams.bindings.${FUNCTION_NAME}-in-x.consumer.logging-enabled) could control whether logging is enabled or disabled, corresponding to withLoggingEnabled()/Disabled(). This is crucial as it determines whether a changelog topic is created for the state store's fault tolerance.

Introducing these properties would allow users of the functional programming model in Spring Cloud Stream to have the same level of control over state store materialization (caching and logging/changelog creation) as is available through the standard Kafka Streams API.

@gihong-park
Copy link

However, it's important to clarify that the property-based approach described above is primarily suggested as a practical and potentially more immediately achievable solution for these common configuration needs (caching and logging).

To achieve full parity and enable the same level of fine-grained control over all Materialized parameters as offered by the standard Kafka Streams API, a more comprehensive mechanism would likely be required. This might involve something analogous to the concept behind Kafka Streams' setInfrastructureCustomizer or perhaps a dedicated Spring bean customizer interface. Such a mechanism would allow for deeper configuration of the underlying Kafka Streams components (like the Materialized instance) within the Spring Cloud Stream framework, going beyond simple property settings.

Ultimately, the goal is to leverage such an advanced customization mechanism to achieve the best of both worlds: maintaining the advantages and simplicity of the functional programming style within Spring Cloud Stream, while simultaneously gaining the ability to exercise complete, fine-grained control over all aspects of the underlying Kafka Streams topology and components (like Materialized), just as one can with the native Kafka Streams API.

However, as a practical first step, and primarily aimed at users who want a simpler way to control common configurations(like me), the immediate proposal is to introduce binder-level configuration properties for essential features like caching and logging.

and Thank you for your respond.

@sobychacko
Copy link
Contributor

@gihong-park Introducing properties for that problem sounds like a good top-level approach. Are you open to making this change yourself and sending a PR? If so, please let us know. Otherwise, I can take a look later.

@gihong-park
Copy link

@sobychacko Sounds good! Yes, I'm happy to take this on. I'll get started on it right away and will send over a PR once it's ready.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants