From 5fe832a2f1874ef5e9a8e2fc73fbc243642a1b22 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Fri, 11 Feb 2022 23:40:08 +0800 Subject: [PATCH] do not hold mutex for SyncClosedLogs Signed-off-by: Little-Wallace --- db/db_impl/db_impl.cc | 41 +++++++++++++++----------- db/db_impl/db_impl.h | 5 ++-- db/db_impl/db_impl_compaction_flush.cc | 25 ++++++++++++---- db/db_impl/db_impl_write.cc | 20 ++++++++++--- 4 files changed, 63 insertions(+), 28 deletions(-) diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 32660057e19..3abd21b356b 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1398,19 +1398,37 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); + VersionEdit synced_wals; { InstrumentedMutexLock l(&log_write_mutex_); if (status.ok()) { - status = MarkLogsSynced(current_log_number, need_log_dir_sync); + MarkLogsSynced(current_log_number, need_log_dir_sync, &synced_wals); } else { MarkLogsNotSynced(current_log_number); } } + if (status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + status = ApplyWALToManifest(&synced_wals); + } + TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2"); return status; } +Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) { + // not empty, write to MANIFEST. + mutex_.AssertHeld(); + Status status = + versions_->LogAndApplyToDefaultColumnFamily(synced_wals, &mutex_); + if (!status.ok() && versions_->io_status().IsIOError()) { + status = error_handler_.SetBGError(versions_->io_status(), + BackgroundErrorReason::kManifestWrite); + } + return status; +} + Status DBImpl::LockWAL() { log_write_mutex_.Lock(); auto cur_log_writer = logs_.back().writer; @@ -1430,20 +1448,20 @@ Status DBImpl::UnlockWAL() { return Status::OK(); } -Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { +void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, + VersionEdit* synced_wals) { log_write_mutex_.AssertHeld(); if (synced_dir && logfile_number_ == up_to) { log_dir_synced_ = true; } - VersionEdit synced_wals; for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) { auto& wal = *it; assert(wal.getting_synced); if (logs_.size() > 1) { if (immutable_db_options_.track_and_verify_wals_in_manifest && wal.writer->file()->GetFileSize() > 0) { - synced_wals.AddWal(wal.number, - WalMetadata(wal.writer->file()->GetFileSize())); + synced_wals->AddWal(wal.number, + WalMetadata(wal.writer->file()->GetFileSize())); } logs_to_free_.push_back(wal.ReleaseWriter()); it = logs_.erase(it); @@ -1454,22 +1472,11 @@ Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) { } assert(logs_.empty() || logs_[0].number > up_to || (logs_.size() == 1 && !logs_[0].getting_synced)); - - Status s; - if (synced_wals.IsWalAddition()) { - // not empty, write to MANIFEST. - s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_); - if (!s.ok() && versions_->io_status().IsIOError()) { - s = error_handler_.SetBGError(versions_->io_status(), - BackgroundErrorReason::kManifestWrite); - } - } log_sync_cv_.SignalAll(); - return s; } void DBImpl::MarkLogsNotSynced(uint64_t up_to) { - mutex_.AssertHeld(); + log_write_mutex_.AssertHeld(); for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to; ++it) { auto& wal = *it; diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index aaeb47dec89..472812a900a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1606,7 +1606,7 @@ class DBImpl : public DB { void ReleaseFileNumberFromPendingOutputs( std::unique_ptr::iterator>& v); - IOStatus SyncClosedLogs(JobContext* job_context); + IOStatus SyncClosedLogs(JobContext* job_context, VersionEdit* synced_wals); // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Then @@ -1877,7 +1877,8 @@ class DBImpl : public DB { std::unique_ptr* token, LogBuffer* log_buffer); // helper function to call after some of the logs_ were synced - Status MarkLogsSynced(uint64_t up_to, bool synced_dir); + void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit); + Status ApplyWALToManifest(VersionEdit* edit); // WALs with log number up to up_to are not synced successfully. void MarkLogsNotSynced(uint64_t up_to); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index fcd237f6840..0c1bd183f9a 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -82,7 +82,8 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, return false; } -IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { +IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, + VersionEdit* synced_wals) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); InstrumentedMutexLock l(&log_write_mutex_); autovector logs_to_sync; @@ -134,7 +135,7 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". if (io_s.ok()) { - io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true)); + MarkLogsSynced(current_log_number - 1, true, synced_wals); } else { MarkLogsNotSynced(current_log_number - 1); } @@ -201,8 +202,15 @@ Status DBImpl::FlushMemTableToOutputFile( bool need_cancel = false; IOStatus log_io_s = IOStatus::OK(); if (needs_to_sync_closed_wals) { - // SyncClosedLogs() may unlock and re-lock the db_mutex. - log_io_s = SyncClosedLogs(job_context); + // SyncClosedLogs() may unlock and re-lock the db_log_write_mutex. + VersionEdit synced_wals; + mutex_.Unlock(); + log_io_s = SyncClosedLogs(job_context, &synced_wals); + mutex_.Lock(); + if (log_io_s.ok() && synced_wals.IsWalAddition()) { + log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); + } + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && !log_io_s.IsColumnFamilyDropped()) { error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); @@ -459,7 +467,14 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for // single column family case. - log_io_s = SyncClosedLogs(job_context); + VersionEdit synced_wals; + mutex_.Unlock(); + log_io_s = SyncClosedLogs(job_context, &synced_wals); + mutex_.Lock(); + if (log_io_s.ok() && synced_wals.IsWalAddition()) { + log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); + } + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && !log_io_s.IsColumnFamilyDropped()) { if (total_log_size_ > 0) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 61c9d471edb..38ab5512fac 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -491,13 +491,20 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (log_context.need_log_sync) { + VersionEdit synced_wals; log_write_mutex_.Lock(); if (status.ok()) { - status = MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } log_write_mutex_.Unlock(); + if (status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + status = ApplyWALToManifest(&synced_wals); + } + // Requesting sync with two_write_queues_ is expected to be very rare. We // hence provide a simple implementation that is not necessarily efficient. if (two_write_queues_) { @@ -631,16 +638,20 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } } + VersionEdit synced_wals; if (log_context.need_log_sync) { InstrumentedMutexLock l(&log_write_mutex_); if (w.status.ok()) { - w.status = - MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync); + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); } else { MarkLogsNotSynced(logfile_number_); } } - + if (w.status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + w.status = ApplyWALToManifest(&synced_wals); + } write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); } @@ -1064,6 +1075,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, if (write_options.no_slowdown) { status = Status::Incomplete("Write stall"); } else { + InstrumentedMutexLock l(&mutex_); WriteBufferManagerStallWrites(); } }