-
Notifications
You must be signed in to change notification settings - Fork 522
chore(metrics): move queue time record to AsyncNetworkBandwidthLimiter #3010
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robin Han <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors queue time metric recording by centralizing it within the AsyncNetworkBandwidthLimiter class, removing duplicate metric collection code from multiple call sites.
- Removed queue time recording logic from
AbstractObjectStorageandElasticReplicaManager - Added queue time recording directly in
AsyncNetworkBandwidthLimiter.BucketItem.complete() - Changed
BucketItemfrom static to instance inner class to access the limiter'stypefield
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java | Removed queue time metric recording and unused imports, simplified network limiter function to direct method reference |
| s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java | Changed BucketItem to instance inner class and added queue time metric recording in the complete() method |
| s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java | Refactored networkLimiterQueueTimeStats() method with explicit null checks before computeIfAbsent() |
| core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala | Removed queue time metric recording from acquireNetworkOutPermit() method |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| size -= completeSize; | ||
| if (size <= 0) { | ||
| executor.submit(() -> cf.complete(null)); | ||
| NetworkStats.getInstance().networkLimiterQueueTimeStats(type, strategy).record(System.nanoTime() - timestamp); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue time metric is only recorded when items are dequeued from the queue. However, there are cases where the CompletableFuture completes immediately without being queued:
- When
throttleStrategyisBYPASS(line 151-153) - When tokens are available and the queue is empty (line 160-162)
These immediate completions should also record queue time (which would be near-zero) for accurate metrics. Consider also recording the metric in the consume method for these cases.
| HistogramMetric metric; | ||
| if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) { | ||
| metric = networkInboundLimiterQueueTimeStatsMap.get(strategy); | ||
| if (metric == null) { | ||
| metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | ||
| } | ||
| } else { | ||
| metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy); | ||
| if (metric == null) { | ||
| metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | ||
| } | ||
| } | ||
| return metric; |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get() call before computeIfAbsent() is redundant and can impact performance. The computeIfAbsent() method already handles the null check and only computes the value if absent. Consider simplifying to:
public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}| HistogramMetric metric; | |
| if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) { | |
| metric = networkInboundLimiterQueueTimeStatsMap.get(strategy); | |
| if (metric == null) { | |
| metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | |
| } | |
| } else { | |
| metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy); | |
| if (metric == null) { | |
| metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | |
| } | |
| } | |
| return metric; | |
| return type == AsyncNetworkBandwidthLimiter.Type.INBOUND | |
| ? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)) | |
| : networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); |
No description provided.