@@ -192,7 +192,11 @@ void WriteAmpBasedRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
192
192
(!queue_[Env::IO_LOW].empty () && &r == queue_[Env::IO_LOW].front ()))) {
193
193
leader_ = &r;
194
194
int64_t delta = next_refill_us_ - NowMicrosMonotonic (env_);
195
- delta = delta > 0 ? delta : 0 ;
195
+ // Clamp delta between 0 and refill_period_us_:
196
+ // (1) set negative values to 0
197
+ // (2) cap maximum wait time to refill_period_us_ to prevent excessive
198
+ // delays that could occur due to clock skew.
199
+ delta = delta > 0 ? std::min (delta, refill_period_us_) : 0 ;
196
200
if (delta == 0 ) {
197
201
timedout = true ;
198
202
} else {
@@ -328,19 +332,34 @@ Status WriteAmpBasedRateLimiter::Tune() {
328
332
// lower bound for write amplification estimation
329
333
const int kRatioLower = 10 ;
330
334
const int kPercentDeltaMax = 6 ;
335
+ // Define the max limit of tick duration limits to handle clock skew.
336
+ const auto max_tune_tick_duration_limit =
337
+ std::chrono::microseconds (kSecondsPerTune * 1000 * 1000 ) * 7 /
338
+ 4 ; // 1.75x multiplier
331
339
332
- std::chrono::microseconds prev_tuned_time = tuned_time_;
340
+ const std::chrono::microseconds prev_tuned_time = tuned_time_;
333
341
tuned_time_ = std::chrono::microseconds (NowMicrosMonotonic (env_));
334
- auto duration = tuned_time_ - prev_tuned_time;
335
- auto duration_ms =
336
- std::chrono::duration_cast<std::chrono::milliseconds>(duration).count ();
342
+ // Validate tuning interval to detect system anomalies:
343
+ // (1) tuned_time_ < prev_tuned_time: Clock moved backwards (clock skew)
344
+ // (2) Interval > max_tune_tick_duration_limit: System stall or severe clock
345
+ // skew
346
+ if (tuned_time_ <= prev_tuned_time ||
347
+ tuned_time_ >= prev_tuned_time + max_tune_tick_duration_limit) {
348
+ // Fall back to max rate limiter for safety if duration is invalid or
349
+ // exceeds max limit.
350
+ SetActualBytesPerSecond (max_bytes_per_sec_.load (std::memory_order_relaxed));
351
+ return Status::Aborted ();
352
+ }
353
+ auto duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
354
+ tuned_time_ - prev_tuned_time)
355
+ .count ();
337
356
338
357
int64_t prev_bytes_per_sec = GetBytesPerSecond ();
339
-
340
358
// This function can be called less frequent than we anticipate when
341
359
// compaction rate is low. Loop through the actual time slice to correct
342
360
// the estimation.
343
- for (uint32_t i = 0 ; i < duration_ms / kMillisPerTune ; i++) {
361
+ auto sampling_count = duration_ms / kMillisPerTune ;
362
+ for (uint32_t i = 0 ; i < sampling_count; i++) {
344
363
bytes_sampler_.AddSample (duration_bytes_through_ * 1000 / duration_ms);
345
364
highpri_bytes_sampler_.AddSample (duration_highpri_bytes_through_ * 1000 /
346
365
duration_ms);
@@ -409,8 +428,8 @@ void WriteAmpBasedRateLimiter::PaceUp(bool critical) {
409
428
}
410
429
411
430
RateLimiter* NewWriteAmpBasedRateLimiter (
412
- int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */ ,
413
- int32_t fairness /* = 10 */ ,
431
+ int64_t rate_bytes_per_sec /* = 10GiB */ ,
432
+ int64_t refill_period_us /* = 100 * 1000 */ , int32_t fairness /* = 10 */ ,
414
433
RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */ ,
415
434
bool auto_tuned /* = false */ ) {
416
435
assert (rate_bytes_per_sec > 0 );
0 commit comments