Skip to content

Commit

Permalink
do not hold mutex for SyncClosedLogs
Browse files Browse the repository at this point in the history
Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace committed Feb 11, 2022
1 parent ab6edb4 commit 5fe832a
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 28 deletions.
41 changes: 24 additions & 17 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1398,19 +1398,37 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
VersionEdit synced_wals;
{
InstrumentedMutexLock l(&log_write_mutex_);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals);
} else {
MarkLogsNotSynced(current_log_number);
}
}
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}

TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");

return status;
}

Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
// not empty, write to MANIFEST.
mutex_.AssertHeld();
Status status =
versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_);
if (!status.ok() && versions_->io_status().IsIOError()) {
status = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
return status;
}

Status DBImpl::LockWAL() {
log_write_mutex_.Lock();
auto cur_log_writer = logs_.back().writer;
Expand All @@ -1430,20 +1448,20 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}

Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
VersionEdit* synced_wals) {
log_write_mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& wal = *it;
assert(wal.getting_synced);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest &&
wal.writer->file()->GetFileSize() > 0) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
synced_wals->AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
it = logs_.erase(it);
Expand All @@ -1454,22 +1472,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
}
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));

Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
log_write_mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
Expand Down
5 changes: 3 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1606,7 +1606,7 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);

IOStatus SyncClosedLogs(JobContext* job_context);
IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals);

// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then
Expand Down Expand Up @@ -1877,7 +1877,8 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);

// helper function to call after some of the logs_ were synced
Status MarkLogsSynced(uint64_t up_to, bool synced_dir);
void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
Status ApplyWALToManifest(VersionEdit* edit);
// WALs with log number up to up_to are not synced successfully.
void MarkLogsNotSynced(uint64_t up_to);

Expand Down
25 changes: 20 additions & 5 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
return false;
}

IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
VersionEdit* synced_wals) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
InstrumentedMutexLock l(&log_write_mutex_);
autovector<log::Writer*, 1> logs_to_sync;
Expand Down Expand Up @@ -134,7 +135,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
if (io_s.ok()) {
io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
MarkLogsSynced(current_log_number - 1, true, synced_wals);
} else {
MarkLogsNotSynced(current_log_number - 1);
}
Expand Down Expand Up @@ -201,8 +202,15 @@ Status DBImpl::FlushMemTableToOutputFile(
bool need_cancel = false;
IOStatus log_io_s = IOStatus::OK();
if (needs_to_sync_closed_wals) {
// SyncClosedLogs() may unlock and re-lock the db_mutex.
log_io_s = SyncClosedLogs(job_context);
// SyncClosedLogs() may unlock and re-lock the db_log_write_mutex.
VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}

if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush);
Expand Down Expand Up @@ -459,7 +467,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
log_io_s = SyncClosedLogs(job_context);
VersionEdit synced_wals;
mutex_.Unlock();
log_io_s = SyncClosedLogs(job_context, &synced_wals);
mutex_.Lock();
if (log_io_s.ok() && synced_wals.IsWalAddition()) {
log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals));
}

if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() &&
!log_io_s.IsColumnFamilyDropped()) {
if (total_log_size_ > 0) {
Expand Down
20 changes: 16 additions & 4 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,20 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
}

if (log_context.need_log_sync) {
VersionEdit synced_wals;
log_write_mutex_.Lock();
if (status.ok()) {
status = MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync);
MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync,
&synced_wals);
} else {
MarkLogsNotSynced(logfile_number_);
}
log_write_mutex_.Unlock();
if (status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
status = ApplyWALToManifest(&synced_wals);
}

// Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient.
if (two_write_queues_) {
Expand Down Expand Up @@ -631,16 +638,20 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
}
}

VersionEdit synced_wals;
if (log_context.need_log_sync) {
InstrumentedMutexLock l(&log_write_mutex_);
if (w.status.ok()) {
w.status =
MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync);
MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync,
&synced_wals);
} else {
MarkLogsNotSynced(logfile_number_);
}
}

if (w.status.ok() && synced_wals.IsWalAddition()) {
InstrumentedMutexLock l(&mutex_);
w.status = ApplyWALToManifest(&synced_wals);
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status);
}

Expand Down Expand Up @@ -1064,6 +1075,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
if (write_options.no_slowdown) {
status = Status::Incomplete("Write stall");
} else {
InstrumentedMutexLock l(&mutex_);
WriteBufferManagerStallWrites();
}
}
Expand Down

0 comments on commit 5fe832a

Please sign in to comment.