Skip to content

Commit

Permalink
remove mutex in all write method
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Oct 19, 2020
1 parent ed90e2a commit c9589a8
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 96 deletions.
54 changes: 31 additions & 23 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
log_dir_synced_(false),
log_empty_(true),
persist_stats_cf_handle_(nullptr),
log_sync_cv_(&mutex_),
log_sync_cv_(&log_write_mutex_),
total_log_size_(0),
is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
Expand Down Expand Up @@ -267,6 +267,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// we won't drop any deletion markers until SetPreserveDeletesSequenceNumber()
// is called by client and this seqnum is advanced.
preserve_deletes_seqnum_.store(0);
max_total_wal_size_.store(mutable_db_options_.max_total_wal_size,
std::memory_order_relaxed);
}

Status DBImpl::Resume() {
Expand Down Expand Up @@ -573,25 +575,28 @@ Status DBImpl::CloseHelper() {
mutex_.Lock();
}

for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
{
InstrumentedMutexLock lock(&log_write_mutex_);
for (auto l : logs_to_free_) {
delete l;
}
for (auto& log : logs_) {
uint64_t log_number = log.writer->get_log_number();
Status s = log.ClearWriter();
if (!s.ok()) {
ROCKS_LOG_WARN(
immutable_db_options_.info_log,
"Unable to Sync WAL file %s with error -- %s",
LogFileName(immutable_db_options_.wal_dir, log_number).c_str(),
s.ToString().c_str());
// Retain the first error
if (ret.ok()) {
ret = s;
}
}
}
logs_.clear();
}
logs_.clear();

// Table cache may have table handles holding blocks from the block cache.
// We need to release them before the block cache is destroyed. The block
Expand Down Expand Up @@ -1105,6 +1110,11 @@ Status DBImpl::SetDBOptions(
new_options.stats_persist_period_sec);
mutex_.Lock();
}
if (new_options.max_total_wal_size !=
mutable_db_options_.max_total_wal_size) {
max_total_wal_size_.store(new_options.max_total_wal_size,
std::memory_order_release);
}
write_controller_.set_max_delayed_write_rate(
new_options.delayed_write_rate);
table_cache_.get()->SetCapacity(new_options.max_open_files == -1
Expand Down Expand Up @@ -1224,7 +1234,7 @@ Status DBImpl::SyncWAL() {
uint64_t current_log_number;

{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
assert(!logs_.empty());

// This SyncWAL() call only cares about logs up to this number.
Expand Down Expand Up @@ -1281,7 +1291,7 @@ Status DBImpl::SyncWAL() {

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{
InstrumentedMutexLock l(&mutex_);
InstrumentedMutexLock l(&log_write_mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");
Expand Down Expand Up @@ -1310,7 +1320,7 @@ Status DBImpl::UnlockWAL() {

void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
const Status& status) {
mutex_.AssertHeld();
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to && status.ok()) {
log_dir_synced_ = true;
}
Expand All @@ -1319,8 +1329,6 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
log.getting_synced = false;
Expand Down Expand Up @@ -2582,7 +2590,6 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
s = cfd->AddDirectories(&dummy_created_dirs);
}
if (s.ok()) {
single_column_family_mode_ = false;
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
Expand All @@ -2599,6 +2606,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
single_column_family_mode_.store(false, std::memory_order_release);
} else {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Creating column family [%s] FAILED -- %s",
Expand Down
17 changes: 12 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1050,10 +1050,10 @@ class DBImpl : public DB {

// only used for dynamically adjusting max_total_wal_size. it is a sum of
// [write_buffer_size * max_write_buffer_number] over all column families
uint64_t max_total_in_memory_state_;
std::atomic<uint64_t> max_total_in_memory_state_;
// If true, we have only one (default) column family. We use this to optimize
// some code-paths
bool single_column_family_mode_;
std::atomic<bool> single_column_family_mode_;

// The options to access storage files
const FileOptions file_options_;
Expand Down Expand Up @@ -1257,7 +1257,13 @@ class DBImpl : public DB {
}
}
};

struct LogContext {
explicit LogContext(bool need_sync = false)
: need_log_sync(need_sync), need_log_dir_sync(need_sync) {}
bool need_log_sync;
bool need_log_dir_sync;
log::Writer* writer;
};
struct LogFileNumberSize {
explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
void AddSize(uint64_t new_size) { size += new_size; }
Expand Down Expand Up @@ -1551,8 +1557,8 @@ class DBImpl : public DB {
Status HandleWriteBufferFull(WriteContext* write_context);

// REQUIRES: mutex locked
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
WriteContext* write_context);
Status PreprocessWrite(const WriteOptions& write_options,
LogContext* log_context, WriteContext* write_context);

WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal,
Expand Down Expand Up @@ -2168,6 +2174,7 @@ class DBImpl : public DB {
InstrumentedCondVar atomic_flush_install_cv_;

bool wal_in_db_path_;
std::atomic<uint64_t> max_total_wal_size_;
};

extern Options SanitizeOptions(const std::string& db, const Options& src);
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,

IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
uint64_t current_log_number = logfile_number_;
while (logs_.front().number < current_log_number &&
Expand All @@ -97,7 +97,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {

IOStatus io_s;
if (!logs_to_sync.empty()) {
mutex_.Unlock();
log_write_mutex_.Unlock();

for (log::Writer* log : logs_to_sync) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
Expand All @@ -119,7 +119,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
}

mutex_.Lock();
log_write_mutex_.Lock();

// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
Expand Down
13 changes: 3 additions & 10 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,

// logs_ is empty when called during recovery, in which case there can't yet
// be any tracked obsolete logs
InstrumentedMutexLock l(&log_write_mutex_);
if (!alive_log_files_.empty() && !logs_.empty()) {
uint64_t min_log_number = job_context->log_number;
size_t num_alive_log_files = alive_log_files_.size();
Expand All @@ -259,13 +260,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
}
job_context->size_log_to_delete += earliest.size;
total_log_size_ -= earliest.size;
if (two_write_queues_) {
log_write_mutex_.Lock();
}
alive_log_files_.pop_front();
if (two_write_queues_) {
log_write_mutex_.Unlock();
}

// Current log should always stay alive since it can't have
// number < MinLogNumber().
assert(alive_log_files_.size());
Expand All @@ -278,10 +274,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
continue;
}
logs_to_free_.push_back(log.ReleaseWriter());
{
InstrumentedMutexLock wl(&log_write_mutex_);
logs_.pop_front();
}
logs_.pop_front();
}
// Current log cannot be obsolete.
assert(!logs_.empty());
Expand Down
Loading

0 comments on commit c9589a8

Please sign in to comment.