-
Notifications
You must be signed in to change notification settings - Fork 643
ReplicationThrottleHelper: Add support for AdminClient bulk operations #2305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
ReplicationThrottleHelper: Add support for AdminClient bulk operations #2305
Conversation
|
Hi @il-kyun, let me know once you get the CI tests passing, I'll be happy to add a review if you would like! |
cruise-control/src/test/java/com/linkedin/kafka/cruisecontrol/executor/ExecutorTest.java
Outdated
Show resolved
Hide resolved
kyguy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a really useful enhancement! Why not have this request batching implementation simply replace the existing non-batching implementation instead of having it be configurable? Is there any specific reason users would not want to have the AdminClient operations batched like this?
...ntrol/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java
Outdated
Show resolved
Hide resolved
...ntrol/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java
Show resolved
Hide resolved
| /** | ||
| * <code>bulk.replication.throttle.bulk.ops.enabled</code> | ||
| */ | ||
| public static final String BULK_REPLICATION_THROTTLE_BULK_OPS_ENABLED_CONFIG = "bulk.replication.throttle.bulk.ops.enabled"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which scenarios would users not want to bulk alter/describe configs operations? Are there any drawbacks of enabling this by default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison Thanks for bringing that up!
We actually discussed this earlier in here — at the time, we kept _useBulkOps configurable mainly out of caution, since we hadn’t yet fully validated that replacing the per-entity logic wouldn’t introduce unexpected side effects.
Removes the _useBulkOps flag and makes the bulk path the default behavior to simplify the codebase and reduce configuration complexity.
# Conflicts: # cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.java
...-control/src/main/java/com/linkedin/kafka/cruisecontrol/config/constants/ExecutorConfig.java
Outdated
Show resolved
Hide resolved
| import org.apache.kafka.clients.admin.ConfigEntry; | ||
| import org.apache.kafka.common.config.ConfigResource; | ||
| import org.apache.kafka.common.KafkaFuture; | ||
| import org.apache.kafka.server.config.QuotaConfigs; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not part of the Kafka public API, we should avoid using it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's something wrong with this file, it does not compile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rebased onto main and resolved the conflicts — it’s all good now.
9311e2d to
d1b3c4c
Compare
kyguy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @il-kyun! I just had a quick pass, mostly minor comments so far, I'll have a closer look later!
I was wondering if it would make sense to batch the bulk operations to reduce memory footprint and improve concurrency control for when working with large Kafka clusters. Left a comment concerning that below, let me know what you think!
| participatingBrokers.addAll( | ||
| proposal.oldReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toSet())); | ||
| participatingBrokers.addAll( | ||
| proposal.newReplicas().stream().map(ReplicaPlacementInfo::brokerId).collect(Collectors.toSet())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes necessary? It looks like this is just changing the formatting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reverted it. 44251ab
| List<ConfigResource> resources = brokerIds.stream() | ||
| .map(id -> new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(id))) | ||
| .collect(Collectors.toList()); | ||
| return _adminClient.describeConfigs(resources).all().get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the cases where we are dealing with large Kafka clusters, would it make sense to batch the requests here? From what I understand it would help avoid memory and concurrency issues with the admin client. Alternatively, maybe it would make more sense to batch the set of brokerIds in the calling methods to reduce the memory impact of storing the broker configs as well. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes — splitting the broker set before calling describeConfigs makes sense to reduce payload size, heap pressure, and timeout risk in the AdminClient.
Recomputing with ~100 KB per broker:
• 50 brokers ≈ ~5 MB per call
• 40 brokers ≈ ~4 MB per call
• 25 brokers ≈ ~2.5 MB per call
Given this, I think 25 is a reasonable default batch size — it keeps each request lightweight while maintaining good throughput. The value should remain configurable so it can be tuned based on cluster size and network performance.
Would 25 per batch as a default (configurable) work for you?
| private static final List<String> REPLICATION_THROTTLED_RATE_CONFIGS = Arrays.asList( | ||
| LEADER_REPLICATION_THROTTLED_RATE_CONFIG, | ||
| FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? It appears we only the value use it in one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just changed it to use List.of() 96b6471
| private static final List<String> REPLICATION_THROTTLED_REPLICAS_CONFIGS = Arrays.asList( | ||
| LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, | ||
| FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? It appears we only the value use it in one place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just changed it to use List.of() 96b6471
| } | ||
| } | ||
|
|
||
| void waitForConfigs(Map<ConfigResource, Collection<AlterConfigOp>> opsByResource) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we are keeping the original waitForConfigs() method? Can we completely replace it with this implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right - we can remove the single-resource version. 0fc26ab
| applyIncrementalAlterConfigsForBrokers(bulkOps); | ||
| } | ||
|
|
||
| private void applyIncrementalAlterConfigsForBrokers(Map<ConfigResource, Collection<AlterConfigOp>> bulkOps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is meant to replace changeBrokerConfigs, right? If so, would it make sense to keep the method name a changeBrokerConfigs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed it back to changeBrokerConfigs() to maintain consistency with the previous implementation and make the purpose clearer. 44251ab
| LOG.debug("Removing leader throttle rate: {} on broker {}", currLeaderThrottle, brokerId); | ||
| ops.add(new AlterConfigOp(new ConfigEntry(LEADER_REPLICATION_THROTTLED_RATE_CONFIG, null), AlterConfigOp.OpType.DELETE)); | ||
| } | ||
| private void applyIncrementalAlterConfigsForTopics(Map<ConfigResource, Collection<AlterConfigOp>> bulkOps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is meant to replace changeTopicConfigs, right? If so, would it make sense to keep the method name a changeTopicConfigs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed it back to changeTopicConfigs() for consistency and clarity. 44251ab
|
@CCisGG I think I may have misunderstood earlier — this PR seems to be a more appropriate solution. |
Just to confirm, are you saying #2304 is not a good path to go, and instead we use this PR 2305 to address the issue? |
@CCisGG |
Summary
incrementalAlterConfigs/describeConfigs) scale poorly on large clusters and are fragile when topics are deleted mid-run.The new design applies and verifies broker and topic throttles in batches by default
Expected Behavior
*and static broker configs (skips removal of static values).Actual Behavior (Before Change)
Steps to Reproduce
Additional evidence
Categorization
This PR resolves #1972