@@ -189,7 +189,11 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
189
189
(!queue_[Env::IO_LOW].empty () && &r == queue_[Env::IO_LOW].front ()))) {
190
190
leader_ = &r;
191
191
int64_t delta = next_refill_us_ - NowMicrosMonotonic (env_);
192
- delta = delta > 0 ? delta : 0 ;
192
+ // Clamp delta between 0 and refill_period_us_:
193
+ // (1) set negative values to 0
194
+ // (2) cap maximum wait time to refill_period_us_ to prevent excessive
195
+ // delays that could occur due to clock skew.
196
+ delta = delta > 0 ? std::min (delta, refill_period_us_) : 0 ;
193
197
if (delta == 0 ) {
194
198
timedout = true ;
195
199
} else {
@@ -325,20 +329,35 @@ Status WriteAmpBasedRateLimiter::Tune() {
325
329
// lower bound for write amplification estimation
326
330
const int kRatioLower = 10 ;
327
331
const int kPercentDeltaMax = 6 ;
332
+ const auto millis_per_tune = 1000 * secs_per_tune_;
333
+ // Define the max limit of tick duration limits to handle clock skew.
334
+ const auto max_tune_tick_duration_limit =
335
+ std::chrono::microseconds (secs_per_tune_ * 1000 * 1000 ) * 7 /
336
+ 4 ; // 1.75x multiplier
328
337
329
- std::chrono::microseconds prev_tuned_time = tuned_time_;
338
+ const std::chrono::microseconds prev_tuned_time = tuned_time_;
330
339
tuned_time_ = std::chrono::microseconds (NowMicrosMonotonic (env_));
331
- auto duration = tuned_time_ - prev_tuned_time;
332
- auto duration_ms =
333
- std::chrono::duration_cast<std::chrono::milliseconds>(duration).count ();
340
+ // Validate tuning interval to detect system anomalies:
341
+ // (1) tuned_time_ < prev_tuned_time: Clock moved backwards (clock skew)
342
+ // (2) Interval > max_tune_tick_duration_limit: System stall or severe clock
343
+ // skew
344
+ if (tuned_time_ <= prev_tuned_time ||
345
+ tuned_time_ >= prev_tuned_time + max_tune_tick_duration_limit) {
346
+ // Fall back to max rate limiter for safety if duration is invalid or
347
+ // exceeds max limit.
348
+ SetActualBytesPerSecond (max_bytes_per_sec_.load (std::memory_order_relaxed));
349
+ return Status::Aborted ();
350
+ }
351
+ auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
352
+ tuned_time_ - prev_tuned_time)
353
+ .count ();
334
354
335
355
int64_t prev_bytes_per_sec = GetBytesPerSecond ();
336
-
337
356
// This function can be called less frequent than we anticipate when
338
357
// compaction rate is low. Loop through the actual time slice to correct
339
358
// the estimation.
340
- auto millis_per_tune = 1000 * secs_per_tune_ ;
341
- for (uint32_t i = 0 ; i < duration_ms / millis_per_tune ; i++) {
359
+ auto sampling_count = duration_ms / millis_per_tune ;
360
+ for (uint32_t i = 0 ; i < sampling_count ; i++) {
342
361
bytes_sampler_.AddSample (duration_bytes_through_ * 1000 / duration_ms);
343
362
highpri_bytes_sampler_.AddSample (duration_highpri_bytes_through_ * 1000 /
344
363
duration_ms);
@@ -407,8 +426,8 @@ void WriteAmpBasedRateLimiter::PaceUp(bool critical) {
407
426
}
408
427
409
428
RateLimiter* NewWriteAmpBasedRateLimiter (
410
- int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */ ,
411
- int32_t fairness /* = 10 */ ,
429
+ int64_t rate_bytes_per_sec /* = 10GiB */ ,
430
+ int64_t refill_period_us /* = 100 * 1000 */ , int32_t fairness /* = 10 */ ,
412
431
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */ ,
413
432
bool auto_tuned /* = false */ , int tune_per_sec /* = 1 */ ,
414
433
size_t smooth_window_size /* = 300 */ ,
0 commit comments