-
Notifications
You must be signed in to change notification settings - Fork 522
chore(metrics): move queue time record to AsyncNetworkBandwidthLimiter #3011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robin Han <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors queue time metric recording for network bandwidth limiting by moving it from call sites into the AsyncNetworkBandwidthLimiter class. This centralizes the metric collection logic and reduces code duplication.
- Moved queue time metric recording from external call sites to inside
AsyncNetworkBandwidthLimiter.BucketItem.complete() - Removed manual timer tracking code from
AbstractObjectStorageandElasticReplicaManager - Changed
BucketItemfrom static to non-static inner class to access the limiter's type field
Reviewed Changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java | Removed manual queue time metric recording and timer tracking for both inbound and outbound bandwidth limiting |
| s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java | Added queue time metric recording to BucketItem.complete() and changed BucketItem to non-static inner class |
| s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java | Refactored networkLimiterQueueTimeStats() with double-check pattern (though redundant) |
| core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala | Removed manual queue time metric recording from acquireNetworkOutPermit() |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| HistogramMetric metric; | ||
| if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) { | ||
| metric = networkInboundLimiterQueueTimeStatsMap.get(strategy); | ||
| if (metric == null) { | ||
| metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | ||
| } | ||
| } else { | ||
| metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy); | ||
| if (metric == null) { | ||
| metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | ||
| } | ||
| } | ||
| return metric; |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The double-check pattern here is redundant. computeIfAbsent() already handles the null case atomically and will only compute the value if the key is absent. The explicit null check before calling computeIfAbsent() provides no additional benefit and adds unnecessary complexity.
Consider simplifying to:
public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}| HistogramMetric metric; | |
| if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) { | |
| metric = networkInboundLimiterQueueTimeStatsMap.get(strategy); | |
| if (metric == null) { | |
| metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | |
| } | |
| } else { | |
| metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy); | |
| if (metric == null) { | |
| metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); | |
| } | |
| } | |
| return metric; | |
| return type == AsyncNetworkBandwidthLimiter.Type.INBOUND | |
| ? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)) | |
| : networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy)); |
cherry-pick #3010