@@ -195,7 +195,11 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
195
195
if (leader_ == nullptr && IsFrontOfOneQueue (&r)) {
196
196
leader_ = &r;
197
197
int64_t delta = next_refill_us_ - NowMicrosMonotonic (env_);
198
- delta = delta > 0 ? delta : 0 ;
198
+ // Clamp delta between 0 and refill_period_us_:
199
+ // (1) set negative values to 0
200
+ // (2) cap maximum wait time to refill_period_us_ to prevent excessive
201
+ // delays that could occur due to clock skew.
202
+ delta = delta > 0 ? std::min (delta, refill_period_us_) : 0 ;
199
203
if (delta == 0 ) {
200
204
timedout = true ;
201
205
} else {
@@ -359,20 +363,35 @@ Status WriteAmpBasedRateLimiter::Tune() {
359
363
// lower bound for write amplification estimation
360
364
const int kRatioLower = 10 ;
361
365
const int kPercentDeltaMax = 6 ;
366
+ const auto millis_per_tune = 1000 * secs_per_tune_;
367
+ // Define the max limit of tick duration limits to handle clock skew.
368
+ const auto max_tune_tick_duration_limit =
369
+ std::chrono::microseconds (secs_per_tune_ * 1000 * 1000 ) * 7 /
370
+ 4 ; // 1.75x multiplier
362
371
363
- std::chrono::microseconds prev_tuned_time = tuned_time_;
372
+ const std::chrono::microseconds prev_tuned_time = tuned_time_;
364
373
tuned_time_ = std::chrono::microseconds (NowMicrosMonotonic (env_));
365
- auto duration = tuned_time_ - prev_tuned_time;
366
- auto duration_ms =
367
- std::chrono::duration_cast<std::chrono::milliseconds>(duration).count ();
374
+ // Validate tuning interval to detect system anomalies:
375
+ // (1) tuned_time_ < prev_tuned_time: Clock moved backwards (clock skew)
376
+ // (2) Interval > max_tune_tick_duration_limit: System stall or severe clock
377
+ // skew
378
+ if (tuned_time_ <= prev_tuned_time ||
379
+ tuned_time_ >= prev_tuned_time + max_tune_tick_duration_limit) {
380
+ // Fall back to max rate limiter for safety if duration is invalid or
381
+ // exceeds max limit.
382
+ SetActualBytesPerSecond (max_bytes_per_sec_.load (std::memory_order_relaxed));
383
+ return Status::Aborted ();
384
+ }
385
+ auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
386
+ tuned_time_ - prev_tuned_time)
387
+ .count ();
368
388
369
389
int64_t prev_bytes_per_sec = GetBytesPerSecond ();
370
-
371
390
// This function can be called less frequent than we anticipate when
372
391
// compaction rate is low. Loop through the actual time slice to correct
373
392
// the estimation.
374
- auto millis_per_tune = 1000 * secs_per_tune_ ;
375
- for (uint32_t i = 0 ; i < duration_ms / millis_per_tune ; i++) {
393
+ auto sampling_count = duration_ms / millis_per_tune ;
394
+ for (uint32_t i = 0 ; i < sampling_count ; i++) {
376
395
bytes_sampler_.AddSample (duration_bytes_through_ * 1000 / duration_ms);
377
396
highpri_bytes_sampler_.AddSample (duration_highpri_bytes_through_ * 1000 /
378
397
duration_ms);
@@ -441,8 +460,8 @@ void WriteAmpBasedRateLimiter::PaceUp(bool critical) {
441
460
}
442
461
443
462
RateLimiter* NewWriteAmpBasedRateLimiter (
444
- int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */ ,
445
- int32_t fairness /* = 10 */ ,
463
+ int64_t rate_bytes_per_sec /* = 10GiB */ ,
464
+ int64_t refill_period_us /* = 100 * 1000 */ , int32_t fairness /* = 10 */ ,
446
465
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */ ,
447
466
bool auto_tuned /* = false */ , int tune_per_sec /* = 1 */ ,
448
467
size_t smooth_window_size /* = 300 */ ,
0 commit comments