Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kafka.server.streamaspect

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.s3.metrics.stats.NetworkStats
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
import com.automq.stream.utils.{FutureUtil, Systems}
Expand Down Expand Up @@ -980,11 +979,8 @@ class ElasticReplicaManager(
}

private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = {
val start = time.nanoseconds()
GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
.consume(throttleStrategy, size).join()
val networkStats = NetworkStats.getInstance()
networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start)
}

def handlePartitionFailure(partitionDir: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,18 @@ public Map<Long, Pair<Counter, Counter>> allStreamReadBytesStats() {
}

public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
HistogramMetric metric;
if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) {
metric = networkInboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
} else {
metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
}
return metric;
Comment on lines +99 to +111
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The double-check pattern here is redundant. computeIfAbsent() already handles the null case atomically and will only compute the value if the key is absent. The explicit null check before calling computeIfAbsent() provides no additional benefit and adds unnecessary complexity.

Consider simplifying to:

public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
    return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
            ? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
            : networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
Suggested change
HistogramMetric metric;
if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) {
metric = networkInboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
} else {
metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy);
if (metric == null) {
metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
}
}
return metric;
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));

Copilot uses AI. Check for mistakes.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public String getName() {
}
}

private static class BucketItem implements Comparable<BucketItem> {
private class BucketItem implements Comparable<BucketItem> {
private final ThrottleStrategy strategy;
private final CompletableFuture<Void> cf;
private final long timestamp;
Expand All @@ -212,6 +212,7 @@ public boolean complete(long completeSize, ExecutorService executor) {
size -= completeSize;
if (size <= 0) {
executor.submit(() -> cf.complete(null));
NetworkStats.getInstance().networkLimiterQueueTimeStats(type, strategy).record(System.nanoTime() - timestamp);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.TimerUtil;
import com.automq.stream.s3.metrics.operations.S3Operation;
import com.automq.stream.s3.metrics.stats.NetworkStats;
import com.automq.stream.s3.metrics.stats.S3OperationStats;
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.objects.ObjectAttributes;
Expand Down Expand Up @@ -227,15 +225,7 @@ public CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPa
}

BiFunction<ThrottleStrategy, Long, CompletableFuture<Void>> networkInboundBandwidthLimiterFunction =
(throttleStrategy, size) -> {
long startTime = System.nanoTime();
return networkInboundBandwidthLimiter.consume(throttleStrategy, size)
.whenComplete((v, ex) ->
NetworkStats.getInstance()
.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, throttleStrategy)
.record(TimerUtil.timeElapsedSince(startTime, TimeUnit.NANOSECONDS)));

};
networkInboundBandwidthLimiter::consume;

long acquiredSize = end - start;

Expand Down Expand Up @@ -277,12 +267,9 @@ public CompletableFuture<WriteResult> write(WriteOptions options, String objectP
data.release();
return retCf;
}
TimerUtil timerUtil = new TimerUtil();
networkOutboundBandwidthLimiter
.consume(options.throttleStrategy(), data.readableBytes())
.whenCompleteAsync((v, ex) -> {
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy())
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
if (ex != null) {
cf.completeExceptionally(ex);
data.release();
Expand Down
Loading