Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka.server.streamaspect

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.s3.metrics.stats.NetworkStats
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
import com.automq.stream.utils.{FutureUtil, Systems}
Expand Down Expand Up @@ -980,11 +979,8 @@ class ElasticReplicaManager(
}

private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = {
val start = time.nanoseconds()
GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
.consume(throttleStrategy, size).join()
val networkStats = NetworkStats.getInstance()
networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start)
}

def handlePartitionFailure(partitionDir: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,18 @@ public Map<Long, Pair<Counter, Counter>> allStreamReadBytesStats() {
}

public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
HistogramMetric metric;
if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) {
metric = networkInboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
} else {
metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
}
return metric;
Comment on lines +99 to +111
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get() call before computeIfAbsent() is redundant and can impact performance. The computeIfAbsent() method already handles the null check and only computes the value if absent. Consider simplifying to:

public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
    return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
        ? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
        : networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
Suggested change
HistogramMetric metric;
if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) {
metric = networkInboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
} else {
metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
}
return metric;
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));

Copilot uses AI. Check for mistakes.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public String getName() {
}
}

private static class BucketItem implements Comparable<BucketItem> {
private class BucketItem implements Comparable<BucketItem> {
private final ThrottleStrategy strategy;
private final CompletableFuture<Void> cf;
private final long timestamp;
Expand All @@ -212,6 +212,7 @@ public boolean complete(long completeSize, ExecutorService executor) {
size -= completeSize;
if (size <= 0) {
executor.submit(() -> cf.complete(null));
NetworkStats.getInstance().networkLimiterQueueTimeStats(type, strategy).record(System.nanoTime() - timestamp);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue time metric is only recorded when items are dequeued from the queue. However, there are cases where the CompletableFuture completes immediately without being queued:

  1. When throttleStrategy is BYPASS (line 151-153)
  2. When tokens are available and the queue is empty (line 160-162)

These immediate completions should also record queue time (which would be near-zero) for accurate metrics. Consider also recording the metric in the consume method for these cases.

Copilot uses AI. Check for mistakes.
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.NetworkStats;
import com.automq.stream.s3.metrics.stats.S3OperationStats;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.objects.ObjectAttributes;
Expand Down Expand Up @@ -227,15 +225,7 @@ public CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPa
}

BiFunction<ThrottleStrategy, Long, CompletableFuture<Void>> networkInboundBandwidthLimiterFunction =
(throttleStrategy, size) -> {
long startTime = System.nanoTime();
return networkInboundBandwidthLimiter.consume(throttleStrategy, size)
.whenComplete((v, ex) ->
NetworkStats.getInstance()
.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, throttleStrategy)
.record(TimerUtil.timeElapsedSince(startTime, TimeUnit.NANOSECONDS)));

};
networkInboundBandwidthLimiter::consume;

long acquiredSize = end - start;

Expand Down Expand Up @@ -277,12 +267,9 @@ public CompletableFuture<WriteResult> write(WriteOptions options, String objectP
data.release();
return retCf;
}
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter
.consume(options.throttleStrategy(), data.readableBytes())
.whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy())
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
data.release();
Expand Down
Loading