diff --git a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java index 1c986e53ea..16e43862cd 100644 --- a/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java +++ b/core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java @@ -140,11 +140,12 @@ public void start() { config.networkBaselineBandwidth() - (long) networkInboundRate.derive( TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkInboundUsageTotal().get())); // Use a larger token pool for outbound traffic to avoid spikes caused by Upload WAL affecting tail-reading performance. + long outboundBaselineBandwidth = config.networkBaselineBandwidth() * 5; GlobalNetworkBandwidthLimiters.instance().setup(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, - refillToken, config.refillPeriodMs(), config.networkBaselineBandwidth() * 5); + refillToken * 5, config.refillPeriodMs(), outboundBaselineBandwidth); networkOutboundLimiter = GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND); S3StreamMetricsManager.registerNetworkAvailableBandwidthSupplier(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, () -> - config.networkBaselineBandwidth() - (long) networkOutboundRate.derive( + outboundBaselineBandwidth - (long) networkOutboundRate.derive( TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkOutboundUsageTotal().get())); this.localIndexCache = LocalStreamRangeIndexCache.create();