|
19 | 19 | #include "logging/logging.h"
|
20 | 20 | #include "port/port.h"
|
21 | 21 | #include "util/autovector.h"
|
| 22 | +#include "util/defer.h" |
22 | 23 |
|
23 | 24 | namespace ROCKSDB_NAMESPACE {
|
24 | 25 |
|
@@ -252,6 +253,22 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
252 | 253 | job_context->blob_delete_files);
|
253 | 254 | }
|
254 | 255 |
|
| 256 | + // Before potentially releasing mutex and waiting on condvar, increment |
| 257 | + // pending_purge_obsolete_files_ so that another thread executing |
| 258 | + // `GetSortedWals` will wait until this thread finishes execution since the |
| 259 | + // other thread will be waiting for `pending_purge_obsolete_files_`. |
| 260 | + // pending_purge_obsolete_files_ MUST be decremented if there is nothing to |
| 261 | + // delete. |
| 262 | + ++pending_purge_obsolete_files_; |
| 263 | + |
| 264 | + Defer cleanup([job_context, this]() { |
| 265 | + assert(job_context != nullptr); |
| 266 | + if (!job_context->HaveSomethingToDelete()) { |
| 267 | + mutex_.AssertHeld(); |
| 268 | + --pending_purge_obsolete_files_; |
| 269 | + } |
| 270 | + }); |
| 271 | + |
255 | 272 | // logs_ is empty when called during recovery, in which case there can't yet
|
256 | 273 | // be any tracked obsolete logs
|
257 | 274 | if (!alive_log_files_.empty() && !logs_.empty()) {
|
@@ -308,9 +325,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
|
308 | 325 | job_context->logs_to_free = logs_to_free_;
|
309 | 326 | job_context->log_recycle_files.assign(log_recycle_files_.begin(),
|
310 | 327 | log_recycle_files_.end());
|
311 |
| - if (job_context->HaveSomethingToDelete()) { |
312 |
| - ++pending_purge_obsolete_files_; |
313 |
| - } |
314 | 328 | logs_to_free_.clear();
|
315 | 329 | }
|
316 | 330 |
|
|
0 commit comments