Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not hold mutex when write keys if not necessary #7516

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
c9589a8
remove mutex in all write method
Little-Wallace Oct 19, 2020
2b847d4
Merge branch 'master' into no-mutex
Little-Wallace Nov 12, 2020
f9b52f3
Merge branch 'master' into no-mutex
Little-Wallace Mar 25, 2021
8dab46e
Merge branch 'main' into no-mutex
Little-Wallace Feb 11, 2022
d399620
fix conflict
Little-Wallace Feb 11, 2022
ab6edb4
fix format
Little-Wallace Feb 11, 2022
5fe832a
do not hold mutex for SyncClosedLogs
Little-Wallace Feb 11, 2022
9f3dbe5
release mutex before wait sync
Little-Wallace Mar 2, 2022
551dbee
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 3, 2022
e07daeb
Add log_file_number_size to LogContext
riversand963 Jun 3, 2022
db71ee4
Fix double mutex acquisition for secondary instance in FindObsoleteFi…
riversand963 Jun 3, 2022
e47e2ba
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 3, 2022
1af777e
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 3, 2022
91090c7
Fix data race in DBTest.TestLogCleanup, reported by TSAN
riversand963 Jun 3, 2022
e5caf57
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 4, 2022
3370fe4
Fix compilation after merge
riversand963 Jun 4, 2022
d8cd64c
fix format
riversand963 Jun 4, 2022
a1d1631
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 4, 2022
d26cb10
Wal syncing VersionEdits out-of-order
riversand963 Jun 4, 2022
9ca6f52
Fix atest
riversand963 Jun 4, 2022
8ce05c8
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 4, 2022
84ebc8e
remove useless test
Little-Wallace Jun 5, 2022
c5efaee
Merge branch 'no-mutex' of github.com:Little-Wallace/rocksdb into no-…
Little-Wallace Jun 5, 2022
a109c81
add test print info
Little-Wallace Jun 5, 2022
4a115ef
Deflake DBCompactionTestWithParam.FixFileIngestionCompactionDeadlock
riversand963 Jun 5, 2022
b15e9da
Merge remote-tracking branch 'lwallace/no-mutex' into no-mutex
riversand963 Jun 5, 2022
9c16276
add more print test
Little-Wallace Jun 6, 2022
03a5500
Merge branch 'no-mutex' of github.com:Little-Wallace/rocksdb into no-…
Little-Wallace Jun 6, 2022
929955e
remove test print
Little-Wallace Jun 6, 2022
52b6f65
Merge branch 'main' into no-mutex
Little-Wallace Jun 6, 2022
247d8d5
Merge branch 'main' into no-mutex
Little-Wallace Jun 6, 2022
0c8250b
Merge branch 'main' into no-mutex
Little-Wallace Jun 8, 2022
d9e1723
fix delete log files
Little-Wallace Jun 8, 2022
2e6f238
address comment
Little-Wallace Jun 10, 2022
3484ee8
Merge branch 'main' into no-mutex
Little-Wallace Jun 17, 2022
e857492
Merge branch 'main' into no-mutex
Little-Wallace Jun 17, 2022
9667725
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 18, 2022
40641e9
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 22, 2022
1f7f6c8
Protect pending_purge_obsolete_files_ with mutex_
riversand963 Jun 23, 2022
83fc607
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jun 24, 2022
4f1e104
Replace a compare func with lambda
riversand963 Jun 24, 2022
9f32780
Merge branch 'main' into no-mutex
Little-Wallace Jun 28, 2022
56d8e15
DBImpl::pending_obsolete_files_ does not have to be atomic
riversand963 Jul 1, 2022
644e0da
Rename a variable
riversand963 Jul 2, 2022
baeb309
Some minor changes that does not change actual behavior
riversand963 Jul 5, 2022
8678825
alive_log_files_ actually relies on db mutex
riversand963 Jul 6, 2022
88aaa15
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jul 6, 2022
d8c8a95
Fix a typo caused by search/replace
riversand963 Jul 6, 2022
033500c
Update comment for alive_log_files_
riversand963 Jul 6, 2022
9bd4533
Update comment for DBImpl::logs_
riversand963 Jul 6, 2022
eb5b4d4
Update comment and improve readability
riversand963 Jul 7, 2022
c40e425
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jul 8, 2022
9ccc811
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jul 14, 2022
dd0fda9
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jul 19, 2022
c16664a
Merge remote-tracking branch 'upstream/main' into no-mutex
riversand963 Jul 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)
* DB::Write does not hold global `mutex_` if this db instance does not need to switch wal and mem-table.

## 7.3.0 (05/20/2022)
### Bug Fixes
Expand Down
14 changes: 3 additions & 11 deletions db/db_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5517,18 +5517,10 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
for (int j = 0; j != kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
}
if (0 == i) {
// When we reach here, the memtables have kNumKeysPerFile keys. Note that
// flush is not yet triggered. We need to write an extra key so that the
// write path will call PreprocessWrite and flush the previous key-value
// pairs to e flushed. After that, there will be the newest key in the
// memtable, and a bunch of L0 files. Since there is already one key in
// the memtable, then for i = 1, 2, ..., we do not have to write this
// extra key to trigger flush.
ASSERT_OK(Put("", ""));
if (i > 0) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
}
// When we reach this point, there will be level0_stop_writes_trigger L0
// files and one extra key (99) in memory, which overlaps with the external
Expand Down
94 changes: 54 additions & 40 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
log_dir_synced_(false),
log_empty_(true),
persist_stats_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
log_sync_cv_(&log_write_mutex_),
total_log_size_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
Expand Down Expand Up @@ -270,6 +270,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
mutable_db_options_.Dump(immutable_db_options_.info_log.get());
DumpSupportInfo(immutable_db_options_.info_log.get());

max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
if (write_buffer_manager_) {
wbm_stall_.reset(new WBMStallInterface());
}
Expand Down Expand Up @@ -640,26 +642,28 @@ Status DBImpl::CloseHelper() {
job_context.Clean();
mutex_.Lock();
}

for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.GetWalDir(), log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
}
}
}
logs_.clear();
}
logs_.clear();

// Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block
Expand Down Expand Up @@ -1228,6 +1232,11 @@ Status DBImpl::SetDBOptions(
new_options.stats_persist_period_sec);
mutex_.Lock();
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down Expand Up @@ -1347,7 +1356,7 @@ Status DBImpl::SyncWAL() {
uint64_t current_log_number;

{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty());

// This SyncWAL() call only cares about logs up to this number.
Expand Down Expand Up @@ -1405,19 +1414,37 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
VersionEdit synced_wals;
{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
} else {
MarkLogsNotSynced(current_log_number);
}
}
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");

return status;
}

Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
// not empty, write to MANIFEST.
mutex_.AssertHeld();
Status status =
versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_);
if (!status.ok() && versions_->io_status().IsIOError()) {
status = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}

Status DBImpl::LockWAL() {
log_write_mutex_.Lock();
auto cur_log_writer = logs_.back().writer;
Expand All @@ -1437,25 +1464,23 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}

Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
mutex_.AssertHeld();
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
VersionEdit* synced_wals) {
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.getting_synced);
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.writer->file()->GetFileSize() > 0) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
synced_wals->AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}

if (logs_.size() > 1) {
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
wal.getting_synced = false;
Expand All @@ -1464,22 +1489,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));

Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
log_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
Expand Down
23 changes: 17 additions & 6 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ class DBImpl : public DB {

// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
std::atomic<uint64_t> max_total_in_memory_state_;

// The options to access storage files
const FileOptions file_options_;
Expand Down Expand Up @@ -1582,6 +1582,15 @@ class DBImpl : public DB {
bool getting_synced = false;
};

struct LogContext {
explicit LogContext(bool need_sync = false)
: need_log_sync(need_sync), need_log_dir_sync(need_sync) {}
bool need_log_sync = false;
bool need_log_dir_sync = false;
log::Writer* writer = nullptr;
LogFileNumberSize* log_file_number_size = nullptr;
};

// PurgeFileInfo is a structure to hold information of files to be deleted in
// purge_files_
struct PurgeFileInfo {
Expand Down Expand Up @@ -1735,7 +1744,7 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

IOStatus SyncClosedLogs(JobContext* job_context);
IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals);

// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then
Expand Down Expand Up @@ -1895,8 +1904,8 @@ class DBImpl : public DB {
Status HandleWriteBufferManagerFlush(WriteContext* write_context);

// REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
WriteContext* write_context);
Status PreprocessWrite(const WriteOptions& write_options,
LogContext* log_context, WriteContext* write_context);

WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal,
Expand Down Expand Up @@ -2032,7 +2041,8 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);

// helper function to call after some of the logs_ were synced
Status MarkLogsSynced(uint64_t up_to, bool synced_dir);
void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
Status ApplyWALToManifest(VersionEdit* edit);
// WALs with log number up to up_to are not synced successfully.
void MarkLogsNotSynced(uint64_t up_to);

Expand Down Expand Up @@ -2388,7 +2398,7 @@ class DBImpl : public DB {

// Number of times FindObsoleteFiles has found deletable files and the
// corresponding call to PurgeObsoleteFiles has not yet finished.
int pending_purge_obsolete_files_;
std::atomic<int> pending_purge_obsolete_files_;

// last time when DeleteObsoleteFiles with full scan was executed. Originally
// initialized with startup time.
Expand Down Expand Up @@ -2509,6 +2519,7 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_;

bool wal_in_db_path_;
std::atomic<uint64_t> max_total_wal_size_;

BlobFileCompletionCallback blob_callback_;

Expand Down
31 changes: 23 additions & 8 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
return false;
}

IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
Expand All @@ -101,7 +102,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {

IOStatus io_s;
if (!logs_to_sync.empty()) {
mutex_.Unlock();
log_write_mutex_.Unlock();

assert(job_context);

Expand Down Expand Up @@ -129,12 +130,12 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {

TEST_SYNC_POINT_CALLBACK("DBImpl::SyncClosedLogs:BeforeReLock",
/*arg=*/nullptr);
mutex_.Lock();
log_write_mutex_.Lock();

// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
if (io_s.ok()) {
io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
MarkLogsSynced(current_log_number - 1, true, synced_wals);
} else {
MarkLogsNotSynced(current_log_number - 1);
}
Expand Down Expand Up @@ -221,8 +222,15 @@ Status DBImpl::FlushMemTableToOutputFile(
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) {
// SyncClosedLogs() may unlock and re-lock the db_mutex.
log_io_s = SyncClosedLogs(job_context);
// SyncClosedLogs() may unlock and re-lock the db_log_write_mutex.
VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}

if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
Expand Down Expand Up @@ -475,7 +483,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
log_io_s = SyncClosedLogs(job_context);
VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}

if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
if (total_log_size_ > 0) {
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void DBImpl::TEST_EndWrite(void* w) {
}

size_t DBImpl::TEST_LogsToFreeSize() {
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
return logs_to_free_.size();
}

Expand Down
Loading