Skip to content

Commit

Permalink
Do not hold mutex when write keys if not necessary (facebook#7516)
Browse files Browse the repository at this point in the history
Summary:
RocksDB will acquire the global mutex of db instance for every time when user calls `Write`.  When RocksDB schedules a lot of compaction jobs,   it will compete the mutex with write thread and it will hurt the write performance.

I want to use log_write_mutex to replace the global mutex in most case so that we do not acquire it in write-thread unless there is a write-stall event or a write-buffer-full event occur.

Pull Request resolved: facebook#7516

Test Plan:
1. make check
2. CI
3. COMPILE_WITH_TSAN=1 make db_stress
make crash_test
make crash_test_with_multiops_wp_txn
make crash_test_with_multiops_wc_txn
make crash_test_with_atomic_flush

Reviewed By: siying

Differential Revision: D36908702

Pulled By: riversand963

fbshipit-source-id: 59b13881f4f5c0a58fd3ca79128a396d9cd98efe
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
Little-Wallace authored and tabokie committed Jul 25, 2022
1 parent 1022df7 commit 98a80e9
Show file tree
Hide file tree
Showing 17 changed files with 357 additions and 220 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

## Behavior Changes
* For track_and_verify_wals_in_manifest, revert to the original behavior before #10087: syncing of live WAL file is not tracked, and we track only the synced sizes of **closed** WALs. (PR #10330).
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table (#7516).

## 6.29.5 (03/29/2022)
### Bug Fixes
Expand Down
14 changes: 3 additions & 11 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5333,18 +5333,10 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
}
if (0 == i) {
// When we reach here, the memtables have kNumKeysPerFile keys. Note that
// flush is not yet triggered. We need to write an extra key so that the
// write path will call PreprocessWrite and flush the previous key-value
// pairs to e flushed. After that, there will be the newest key in the
// memtable, and a bunch of L0 files. Since there is already one key in
// the memtable, then for i = 1, 2, ..., we do not have to write this
// extra key to trigger flush.
ASSERT_OK(Put("", ""));
if (i > 0) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
}
// When we reach this point, there will be level0_stop_writes_trigger L0
// files and one extra key (99) in memory, which overlaps with the external
Expand Down
93 changes: 54 additions & 39 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
log_dir_synced_(false),
log_empty_(true),
persist_stats_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
log_sync_cv_(&log_write_mutex_),
total_log_size_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
Expand Down Expand Up @@ -271,6 +271,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
DumpSupportInfo(immutable_db_options_.info_log.get());

max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}
Expand Down Expand Up @@ -641,26 +643,28 @@ Status DBImpl::CloseHelper() {
job_context.Clean();
mutex_.Lock();
}

for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
}
}
}
logs_.clear();
}
logs_.clear();

// Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block
Expand Down Expand Up @@ -1046,6 +1050,7 @@ Status DBImpl::TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
}

void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
mutex_.AssertHeld();
if (!job_context->logs_to_free.empty()) {
for (auto l : job_context->logs_to_free) {
AddToLogsToFreeQueue(l);
Expand Down Expand Up @@ -1225,6 +1230,11 @@ Status DBImpl::SetDBOptions(
new_options.stats_persist_period_sec);
mutex_.Lock();
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down Expand Up @@ -1345,7 +1355,7 @@ Status DBImpl::SyncWAL() {
uint64_t current_log_number;

{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty());

// This SyncWAL() call only cares about logs up to this number.
Expand Down Expand Up @@ -1402,19 +1412,37 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
VersionEdit synced_wals;
{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
} else {
MarkLogsNotSynced(current_log_number);
}
}
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");

return status;
}

Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
// not empty, write to MANIFEST.
mutex_.AssertHeld();
Status status =
versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_);
if (!status.ok() && versions_->io_status().IsIOError()) {
status = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}

Status DBImpl::LockWAL() {
log_write_mutex_.Lock();
auto cur_log_writer = logs_.back().writer;
Expand All @@ -1434,24 +1462,22 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}

Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
mutex_.AssertHeld();
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
VersionEdit* synced_wals) {
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.IsSyncing());

if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.GetPreSyncSize() > 0) {
synced_wals.AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
wal.FinishSync();
Expand All @@ -1460,22 +1486,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].IsSyncing()));

Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
log_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
Expand Down
Loading

0 comments on commit 98a80e9

Please sign in to comment.