-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler #2211
base: main
Are you sure you want to change the base?
Conversation
@@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler { | |||
// Configurations | |||
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers"; | |||
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic"; | |||
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.topic.assert.attempts"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should call the config metric.reporter.sampler.topic.assert.attempts
as it is a config of the sampler and not the reporter.
Also please update the proper section of the Configurations.md file with your configuration. I think you should also mention that this uses exponential backoff and too big numbers will cause infrequent and long backoff between retries after a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, I have fixed it!
@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) { | |||
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG)); | |||
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX); | |||
_currentPartitionAssignment = Collections.emptySet(); | |||
|
|||
LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: calling the cluster "target" is a bit misleading as it is rather a source cluster (source of the metrics), but I think we should just say "...to be available in the Kafka cluster".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I did not change that, this was the original form (with "target") as you can see:
Line 171 in 2b81fb1
+ _metricReporterTopic + " in the target cluster."); |
I reused that in the new exception message.
I am happy to change it if you think it is better to use the mentioned form.
LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic); | ||
if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) { | ||
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic | ||
+ "] in the target cluster."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing about "target" as above.
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches " | ||
+ _metricReporterTopic + " in the target cluster."); | ||
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches [" | ||
+ _metricReporterTopic + "] in the target cluster."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another "target".
@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) { | |||
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG)); | |||
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX); | |||
_currentPartitionAssignment = Collections.emptySet(); | |||
|
|||
LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic); | |||
if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see it uses exponential backoff periods. At the 7th retry it is already bit more than 10 minutes which I think may be unreasonably long. And for the 8th retry it waits 21 minutes which may be too long between retries. I'm not sure if this is the good approach, at least not with the current parameters.
I think we should either use 1 as a base, so we retry every 5 second for n times, or something closer to 1, like 1.25. With this parameter we can try 18 times until we get to 22 minutes total. To round it up, 20 times will get you a 35 minute total retry time. I think it's more reasonable to try 20 times in 35 minutes than 6 as it allows quicker startup.
Another approach is to use constant retry intervals with a given timeout. I think that is a more user-friendly approach as it's easier to calculate with that compared to exponents, especially that listTopics()
is just a metadata call which is OK to do every 10 seconds or so with a single consumer. Overall I'm OK with the exponent approach if you or others agree on this but I favor the interval+timeout one for usability reasons. Hopefully if we set a good default, users won't have to change it too often and then it may matter less.
@mhratson would you please review this? |
Summary
cruise.control.metrics.topic.auto.create=true
) with the desired guaranties (replica count). If we (startup script/automation) did not wait enough with the CC start command, then it could fail.There is now retry logic in the
CruiseControlMetricsReporterSampler
initialization to handle this edge case.2 things could happen depending on the Kafka's
auto.create.topics.enable
configauto.create.topics.enable=false
CC startup fails with the following error after the unsuccessful
refreshPartitionAssignment()
as the topic does not exist:auto.create.topics.enable=true
During the
refreshPartitionAssignment()
the Kafka Consumer send atopicMetadata
request to the brokers. This request triggers the auto topic creation mechanism in Kafka. Based on which broker actually creates the topic, the topic creation can be sync or async, meaning therefreshPartitionAssignment()
could fail or succeed. In either case, the topic creation won't use the desired topic properties we defined in theCruiseControlMetricsReporter
configcruise.control.metrics.topic.num.partitions
,cruise.control.metrics.topic.replication.factor
. It will use a default topic configuration silently.If the
refreshPartitionAssignment()
succeed, we did not even notice anything.If the
refreshPartitionAssignment()
fails, the CC won't start. However, the topic auto creation got triggered, and the brokers will create the topic. If we check the logs, we will see the error above, but we will have a hard time to figure out why the error happened since the topic exist in Kafka (with the default topic config). If we start the CC again, then it starts without any issue, and will use the topic created by the topic auto creation mechanism.CruiseControlMetricsReporterSampler
configuration.Expected Behavior
Metrics topic got created with the defined configuration.
CC tolerate lagging, slow broker startup
Actual Behavior
Metrics topic got created with the default topic config
CC fails instantly if the topic does not exist during the startup
Steps to Reproduce
Known Workarounds
No
Categorization