Skip to content

Commit 17d3e5f

Browse files
authored
chore(metrics): move queue time record to AsyncNetworkBandwidthLimiter (#3011)
Signed-off-by: Robin Han <[email protected]>
1 parent dd70821 commit 17d3e5f

File tree

4 files changed

+16
-22
lines changed

4 files changed

+16
-22
lines changed

core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package kafka.server.streamaspect
22

33
import com.automq.stream.api.exceptions.FastReadFailFastException
4-
import com.automq.stream.s3.metrics.stats.NetworkStats
54
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
65
import com.automq.stream.s3.network.{AsyncNetworkBandwidthLimiter, GlobalNetworkBandwidthLimiters, ThrottleStrategy}
76
import com.automq.stream.utils.{FutureUtil, Systems}
@@ -980,11 +979,8 @@ class ElasticReplicaManager(
980979
}
981980

982981
private def acquireNetworkOutPermit(size: Int, throttleStrategy: ThrottleStrategy): Unit = {
983-
val start = time.nanoseconds()
984982
GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND)
985983
.consume(throttleStrategy, size).join()
986-
val networkStats = NetworkStats.getInstance()
987-
networkStats.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, throttleStrategy).record(time.nanoseconds() - start)
988984
}
989985

990986
def handlePartitionFailure(partitionDir: String): Unit = {

s3stream/src/main/java/com/automq/stream/s3/metrics/stats/NetworkStats.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,18 @@ public Map<Long, Pair<Counter, Counter>> allStreamReadBytesStats() {
9696
}
9797

9898
public HistogramMetric networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type type, ThrottleStrategy strategy) {
99-
return type == AsyncNetworkBandwidthLimiter.Type.INBOUND
100-
? networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy))
101-
: networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
99+
HistogramMetric metric;
100+
if (type == AsyncNetworkBandwidthLimiter.Type.INBOUND) {
101+
metric = networkInboundLimiterQueueTimeStatsMap.get(strategy);
102+
if (metric == null) {
103+
metric = networkInboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkInboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
104+
}
105+
} else {
106+
metric = networkOutboundLimiterQueueTimeStatsMap.get(strategy);
107+
if (metric == null) {
108+
metric = networkOutboundLimiterQueueTimeStatsMap.computeIfAbsent(strategy, k -> S3StreamMetricsManager.buildNetworkOutboundLimiterQueueTimeMetric(MetricsLevel.INFO, strategy));
109+
}
110+
}
111+
return metric;
102112
}
103113
}

s3stream/src/main/java/com/automq/stream/s3/network/AsyncNetworkBandwidthLimiter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ public String getName() {
187187
}
188188
}
189189

190-
private static class BucketItem implements Comparable<BucketItem> {
190+
private class BucketItem implements Comparable<BucketItem> {
191191
private final ThrottleStrategy strategy;
192192
private final CompletableFuture<Void> cf;
193193
private final long timestamp;
@@ -212,6 +212,7 @@ public boolean complete(long completeSize, ExecutorService executor) {
212212
size -= completeSize;
213213
if (size <= 0) {
214214
executor.submit(() -> cf.complete(null));
215+
NetworkStats.getInstance().networkLimiterQueueTimeStats(type, strategy).record(System.nanoTime() - timestamp);
215216
return true;
216217
}
217218
return false;

s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,8 @@
2424
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
2525
import com.automq.stream.s3.metrics.TimerUtil;
2626
import com.automq.stream.s3.metrics.operations.S3Operation;
27-
import com.automq.stream.s3.metrics.stats.NetworkStats;
2827
import com.automq.stream.s3.metrics.stats.S3OperationStats;
2928
import com.automq.stream.s3.metrics.stats.StorageOperationStats;
30-
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
3129
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
3230
import com.automq.stream.s3.network.ThrottleStrategy;
3331
import com.automq.stream.s3.objects.ObjectAttributes;
@@ -227,15 +225,7 @@ public CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPa
227225
}
228226

229227
BiFunction<ThrottleStrategy, Long, CompletableFuture<Void>> networkInboundBandwidthLimiterFunction =
230-
(throttleStrategy, size) -> {
231-
long startTime = System.nanoTime();
232-
return networkInboundBandwidthLimiter.consume(throttleStrategy, size)
233-
.whenComplete((v, ex) ->
234-
NetworkStats.getInstance()
235-
.networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.INBOUND, throttleStrategy)
236-
.record(TimerUtil.timeElapsedSince(startTime, TimeUnit.NANOSECONDS)));
237-
238-
};
228+
networkInboundBandwidthLimiter::consume;
239229

240230
long acquiredSize = end - start;
241231

@@ -277,12 +267,9 @@ public CompletableFuture<WriteResult> write(WriteOptions options, String objectP
277267
data.release();
278268
return retCf;
279269
}
280-
TimerUtil timerUtil = new TimerUtil();
281270
networkOutboundBandwidthLimiter
282271
.consume(options.throttleStrategy(), data.readableBytes())
283272
.whenCompleteAsync((v, ex) -> {
284-
NetworkStats.getInstance().networkLimiterQueueTimeStats(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, options.throttleStrategy())
285-
.record(timerUtil.elapsedAs(TimeUnit.NANOSECONDS));
286273
if (ex != null) {
287274
cf.completeExceptionally(ex);
288275
data.release();

0 commit comments

Comments
 (0)