Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove mutex for write #197

Open
wants to merge 17 commits into
base: 6.4.tikv
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
log_dir_synced_(false),
log_empty_(true),
persist_stats_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
log_sync_cv_(&log_write_mutex_),
total_log_size_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
Expand Down Expand Up @@ -258,6 +258,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
// is called by client and this seqnum is advanced.
preserve_deletes_seqnum_.store(0);
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
}

Status DBImpl::Resume() {
Expand Down Expand Up @@ -526,25 +528,28 @@ Status DBImpl::CloseHelper() {
mutex_.Lock();
}

for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit to make logs_to_free_ be protected by both log_write_mutex_ and mutex_

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes no sense. But i think unlock mutex_ during this function may cause other bad case.

delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
}
}
}
logs_.clear();
}
logs_.clear();

// Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block
Expand Down Expand Up @@ -1014,11 +1019,12 @@ Status DBImpl::SetDBOptions(
mutable_db_options_.max_background_jobs,
mutable_db_options_.base_background_compactions,
/* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits = GetBGJobLimits(
new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
new_options.base_background_compactions, /* parallelize_compactions */ true);
const BGJobLimits new_bg_job_limits =
GetBGJobLimits(new_options.max_background_flushes,
new_options.max_background_compactions,
new_options.max_background_jobs,
new_options.base_background_compactions,
/* parallelize_compactions */ true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* parallelize_compactions */ true);
true /* parallelize_compactions */);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that all code of rocksdb place the comment before param

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're half right. It's a per-file styling and this file happens to use postposition which is rare within the whole codebase...


const bool max_flushes_increased =
new_bg_job_limits.max_flushes > current_bg_job_limits.max_flushes;
Expand Down Expand Up @@ -1072,6 +1078,11 @@ Status DBImpl::SetDBOptions(
thread_persist_stats_.reset();
}
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down Expand Up @@ -1187,7 +1198,7 @@ Status DBImpl::SyncWAL() {
uint64_t current_log_number;

{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty());

// This SyncWAL() call only cares about logs up to this number.
Expand Down Expand Up @@ -1235,7 +1246,7 @@ Status DBImpl::SyncWAL() {

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
Expand Down Expand Up @@ -1264,7 +1275,7 @@ Status DBImpl::UnlockWAL() {

void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
const Status& status) {
mutex_.AssertHeld();
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to && status.ok()) {
log_dir_synced_ = true;
}
Expand All @@ -1273,8 +1284,6 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
log.getting_synced = false;
Expand Down Expand Up @@ -2117,13 +2126,13 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
single_column_family_mode_.store(false, std::memory_order_release);
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
s = cfd->AddDirectories();
}
if (s.ok()) {
single_column_family_mode_ = false;
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
Expand Down
36 changes: 23 additions & 13 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,8 @@ class DBImpl : public DB {
void FindObsoleteFiles(JobContext* job_context, bool force,
bool no_full_scan = false);

void FindObsoleteLogFiles(JobContext* job_context);

// Diffs the files listed in filenames and those that do not
// belong to live files are possibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
Expand Down Expand Up @@ -937,10 +939,12 @@ class DBImpl : public DB {

// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
std::atomic<uint64_t> max_total_in_memory_state_;

std::atomic<uint64_t> max_total_wal_size_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;
std::atomic<bool> single_column_family_mode_;

// The options to access storage files
const EnvOptions env_options_;
Expand Down Expand Up @@ -1134,6 +1138,14 @@ class DBImpl : public DB {
}
};

struct LogContext {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newlines around this struct definition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

explicit LogContext(bool need_sync = false)
: need_log_sync(need_sync), need_log_dir_sync(need_sync) {}
bool need_log_sync;
bool need_log_dir_sync;
log::Writer* writer;
};

struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }
Expand Down Expand Up @@ -1404,8 +1416,8 @@ class DBImpl : public DB {
Status HandleWriteBufferFull(WriteContext* write_context);

// REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
WriteContext* write_context);
Status PreprocessWrite(const WriteOptions& write_options,
LogContext* log_context, WriteContext* write_context);

WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal,
Expand Down Expand Up @@ -1610,12 +1622,11 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_;

// In addition to mutex_, log_write_mutex_ protected writes to stats_history_
// In addition to mutex_, stats_history_mutex_ protected writes to stats_history_
InstrumentedMutex stats_history_mutex_;
// In addition to mutex_, log_write_mutex_ protected writes to logs_ and
// logfile_number_. With two_write_queues it also protects alive_log_files_,
// and log_empty_. Refer to the definition of each variable below for more
// details.
// In addition to mutex_, log_write_mutex_ protected access to logs_,
// logfile_number_, alive_log_files_ and log_empty_.
// Refer to the definition of each variable below for more details.
// Note: to avoid dealock, if needed to acquire both log_write_mutex_ and
// mutex_, the order should be first mutex_ and then log_write_mutex_.
InstrumentedMutex log_write_mutex_;
Expand Down Expand Up @@ -1664,11 +1675,10 @@ class DBImpl : public DB {
std::deque<LogFileNumberSize> alive_log_files_;
// Log files that aren't fully synced, and the current log file.
// Synchronization:
// - push_back() is done from write_thread_ with locked mutex_ and
// log_write_mutex_
// - push_back() is done from write_thread_ with locked log_write_mutex_
// - pop_front() is done from any thread with locked mutex_ and
// log_write_mutex_
// - reads are done with either locked mutex_ or log_write_mutex_
// - reads are done with locked log_write_mutex_
// - back() and items with getting_synced=true are not popped,
// - The same thread that sets getting_synced=true will reset it.
// - it follows that the object referred by back() can be safely read from
Expand All @@ -1690,7 +1700,7 @@ class DBImpl : public DB {
std::atomic<uint64_t> total_log_size_;

// If this is non-empty, we need to delete these log files in background
// threads. Protected by db mutex.
// threads. Protected by db log_write_mutex_.
autovector<log::Writer*> logs_to_free_;

bool is_snapshot_supported_;
Expand Down
19 changes: 13 additions & 6 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,

Status DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
Expand All @@ -97,7 +97,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {

Status s;
if (!logs_to_sync.empty()) {
mutex_.Unlock();
log_write_mutex_.Unlock();

for (log::Writer* log : logs_to_sync) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand All @@ -119,13 +119,12 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
s = directories_.GetWalDir()->Fsync();
}

mutex_.Lock();
log_write_mutex_.Lock();

// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do you set this error now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be saved at "db/db_impl/db_impl_compaction_flush.cc" L549 and L210

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this function, we will not hold mutex so that we can not save BG error here.

TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
}
Expand Down Expand Up @@ -174,8 +173,13 @@ Status DBImpl::FlushMemTableToOutputFile(
// the host crashes after flushing and before WAL is persistent, the
// flushed SST may contain data from write batches whose updates to
// other column families are missing.
// SyncClosedLogs() may unlock and re-lock the db_mutex.
// We must release mutex_ before calling `SyncClosedLogs` because it
// may be blocked waiting other thread to complete the operation of
// synchronizing log file.
// SyncClosedLogs() may unlock and re-lock the log_write_mutex.
mutex_.Unlock();
s = SyncClosedLogs(job_context);
mutex_.Lock();
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
}
Expand Down Expand Up @@ -357,7 +361,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
mutex_.Unlock();
s = SyncClosedLogs(job_context);
mutex_.Lock();
}

// exec_status stores the execution status of flush_jobs as
Expand Down Expand Up @@ -1988,7 +1994,8 @@ DBImpl::BGJobLimits DBImpl::GetBGJobLimits(int max_background_flushes,
}
if (!parallelize_compactions) {
// throttle background compactions until we deem necessary
res.max_compactions = std::max(1, std::min(base_background_compactions, res.max_compactions));
res.max_compactions =
std::max(1, std::min(base_background_compactions, res.max_compactions));
}
return res;
}
Expand Down
4 changes: 2 additions & 2 deletions db/db_impl/db_impl_debug.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,12 @@ void DBImpl::TEST_EndWrite(void* w) {
}

size_t DBImpl::TEST_LogsToFreeSize() {
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
return logs_to_free_.size();
}

uint64_t DBImpl::TEST_LogfileNumber() {
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
return logfile_number_;
}

Expand Down
34 changes: 16 additions & 18 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,21 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
}
}
mutex_.Unlock();
FindObsoleteLogFiles(job_context);
mutex_.Lock();
if (job_context->HaveSomethingToDelete()) {
++pending_purge_obsolete_files_;
if (doing_the_full_scan) {
versions_->AddLiveFiles(&job_context->sst_live);
}
}
}

// logs_ is empty when called during recovery, in which case there can't yet
// be any tracked obsolete logs
void DBImpl::FindObsoleteLogFiles(JobContext* job_context) {
// logs_ is empty when called during recovery, in which case there can't
// yet be any tracked obsolete logs
InstrumentedMutexLock l(&log_write_mutex_);
if (!alive_log_files_.empty() && !logs_.empty()) {
uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size();
Expand All @@ -201,13 +213,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
job_context->size_log_to_delete += earliest.size;
total_log_size_ -= earliest.size;
if (two_write_queues_) {
log_write_mutex_.Lock();
}
alive_log_files_.pop_front();
if (two_write_queues_) {
log_write_mutex_.Unlock();
}

// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert(alive_log_files_.size());
Expand All @@ -220,10 +227,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the log_sync_cv_ may wait here while holding the mutex

Copy link
Member

@tabokie tabokie Feb 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think condvar::Wait will implicitly release mutex (a mutex is paired with it when constructed). Oh, you mean mutex_. That's a good catch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it will release the pairing mutex log_write_mutex_, but not mutex_

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch... I'm thinking about how to solve this problem....

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether it is still correctly to release mutex_ here because if some other thread calling FindObsoleteFiles but this thread is only finish half of its task.

Copy link
Member

@tabokie tabokie Feb 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutex_ is released before, I don't see a problem here. But you can't simply re-acquire the mutex after Wait(). That will break the lock order constraint.

Maybe you can skip those unsync-ed log files, and clean them in the next round?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I will unlock mutex_ before FindObsoleteLogFiles method. PTAL

}
logs_to_free_.push_back(log.ReleaseWriter());
{
InstrumentedMutexLock wl(&log_write_mutex_);
logs_.pop_front();
}
logs_.pop_front();
}
// Current log cannot be obsolete.
assert(!logs_.empty());
Expand All @@ -234,12 +238,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->logs_to_free = logs_to_free_;
job_context->log_recycle_files.assign(log_recycle_files_.begin(),
log_recycle_files_.end());
if (job_context->HaveSomethingToDelete()) {
++pending_purge_obsolete_files_;
if (doing_the_full_scan) {
versions_->AddLiveFiles(&job_context->sst_live);
}
}
logs_to_free_.clear();
}

Expand Down
Loading