From 3182da977cc23968f069e0f7706fcb667a00be49 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 19 Oct 2021 17:07:07 +0800 Subject: [PATCH 1/5] support joinbatch group no blocking Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 33 +-- db/db_impl/db_impl_write.cc | 297 ++++++++++++++--------- db/db_test.cc | 8 + db/write_thread.cc | 16 ++ db/write_thread.h | 2 + include/rocksdb/db.h | 3 + include/rocksdb/utilities/stackable_db.h | 7 + utilities/blob_db/blob_db_impl.h | 7 + 8 files changed, 248 insertions(+), 125 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 8b5381357d5..65fdf3279a9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -110,6 +110,12 @@ class Directories { std::unique_ptr wal_dir_; }; +struct DBWriter { + DBWriter(const WriteOptions& options, std::vector&& updates) + : w(options, std::move(updates), nullptr, 0) {} + WriteThread::Writer w; +}; + // While DB is the public interface of RocksDB, and DBImpl is the actual // class implementing it. It's the entrance of the core RocksdB engine. // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a @@ -153,6 +159,10 @@ class DBImpl : public DB { using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; + using DB::Prepare; + virtual void Prepare(const WriteOptions& options, DBWriter* writer) override; + using DB::Submit; + virtual Status Submit(const WriteOptions& options, DBWriter* writer) override; using DB::MultiBatchWrite; virtual Status MultiBatchWrite(const WriteOptions& options, @@ -1029,16 +1039,10 @@ class DBImpl : public DB { PreReleaseCallback* pre_release_callback = nullptr); Status MultiBatchWriteImpl(const WriteOptions& write_options, - std::vector&& my_batch, - WriteCallback* callback, - uint64_t* log_used = nullptr, uint64_t log_ref = 0, - uint64_t* seq_used = nullptr); + WriteThread::Writer* w, uint64_t* log_used); - Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, - WriteCallback* callback = nullptr, - uint64_t* log_used = nullptr, uint64_t log_ref = 0, - bool disable_memtable = false, - uint64_t* seq_used = nullptr); + Status PipelinedWriteImpl(const WriteOptions& write_options, + WriteThread::Writer* w, uint64_t* log_used); // Write only to memtables without joining any write queue Status UnorderedWriteMemtable(const WriteOptions& write_options, @@ -1060,12 +1064,11 @@ class DBImpl : public DB { // of the write batch that does not have duplicate keys. When seq_per_batch is // not set, each key is a separate sub_batch. Otherwise each duplicate key // marks start of a new sub-batch. - Status WriteImplWALOnly( - WriteThread* write_thread, const WriteOptions& options, - WriteBatch* updates, WriteCallback* callback, uint64_t* log_used, - const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, - PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, - const PublishLastSeq publish_last_seq, const bool disable_memtable); + Status WriteImplWALOnly(WriteThread* write_thread, + const WriteOptions& options, WriteThread::Writer* w, + uint64_t* log_used, uint64_t* seq_used, + const AssignOrder assign_order, + const PublishLastSeq publish_last_seq); // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 0c0210aca90..92f057958ec 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -58,6 +58,79 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { return WriteImpl(write_options, my_batch, nullptr, nullptr); } +void DBImpl::Prepare(const WriteOptions& write_options, DBWriter* writer) { + write_thread_.JoinBatchGroupNoBlocking(&writer->w); +} + +Status DBImpl::Submit(const WriteOptions& write_options, DBWriter* writer) { + if (writer == nullptr || writer->w.batches.empty()) { + return Status::Corruption("Batch is nullptr!"); + } + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Write(writer->w.batches[0]); + } + } + if (write_options.sync && write_options.disableWAL) { + return Status::InvalidArgument("Sync writes has to enable WAL."); + } + if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with concurrent prepares"); + } + if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { + // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt + return Status::NotSupported( + "pipelined_writes is not compatible with seq_per_batch"); + } + if (immutable_db_options_.unordered_write && + immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with unordered_write"); + } + + Status status; + if (write_options.low_pri) { + status = ThrottleLowPriWritesIfNeeded(write_options, writer->w.batches[0]); + if (!status.ok()) { + return status; + } + } + + if (two_write_queues_) { + return Status::NotSupported( + "two_write_queues_ is not compatible with Submit"); + } + + if (immutable_db_options_.unordered_write) { + const size_t sub_batch_cnt = + WriteBatchInternal::Count(writer->w.batches[0]); + uint64_t seq; + write_thread_.AwaitStateForGroupLeader(&writer->w); + status = WriteImplWALOnly(&write_thread_, write_options, &writer->w, + nullptr, &seq, kDoAssignOrder, kDoPublishLastSeq); + TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); + if (!status.ok()) { + return status; + } + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); + status = UnorderedWriteMemtable(write_options, writer->w.batches[0], + nullptr, 0, seq, sub_batch_cnt); + return status; + } + + if (immutable_db_options_.enable_multi_thread_write) { + return MultiBatchWriteImpl(write_options, &writer->w, nullptr); + } + + if (immutable_db_options_.enable_pipelined_write) { + return PipelinedWriteImpl(write_options, &writer->w, nullptr); + } + + return Status::OK(); +} + #ifndef ROCKSDB_LITE Status DBImpl::WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, @@ -69,26 +142,25 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, Status DBImpl::MultiBatchWrite(const WriteOptions& options, std::vector&& updates) { if (immutable_db_options_.enable_multi_thread_write) { - return MultiBatchWriteImpl(options, std::move(updates), nullptr, nullptr); + WriteThread::Writer writer(options, std::move(updates), nullptr, 0); + write_thread_.JoinBatchGroupNoBlocking(&writer); + return MultiBatchWriteImpl(options, &writer, nullptr); } else { return Status::NotSupported(); } } Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, - std::vector&& my_batch, - WriteCallback* callback, uint64_t* log_used, - uint64_t log_ref, uint64_t* seq_used) { + WriteThread::Writer* writer, + uint64_t* log_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - WriteThread::Writer writer(write_options, std::move(my_batch), callback, - log_ref); - write_thread_.JoinBatchGroup(&writer); + write_thread_.AwaitStateForGroupLeader(writer); WriteContext write_context; bool ignore_missing_faimly = write_options.ignore_missing_column_families; - if (writer.state == WriteThread::STATE_GROUP_LEADER) { - if (writer.callback && !writer.callback->AllowWriteBatching()) { + if (writer->state == WriteThread::STATE_GROUP_LEADER) { + if (writer->callback && !writer->callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } WriteThread::WriteGroup wal_write_group; @@ -96,7 +168,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_dir_sync = need_log_sync && !log_dir_synced_; PERF_TIMER_STOP(write_pre_and_post_process_time); - writer.status = + writer->status = PreprocessWrite(write_options, &need_log_sync, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); log::Writer* log_writer = logs_.back().writer; @@ -104,14 +176,14 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, // This can set non-OK status if callback fail. last_batch_group_size_ = - write_thread_.EnterAsBatchGroupLeader(&writer, &wal_write_group); + write_thread_.EnterAsBatchGroupLeader(writer, &wal_write_group); const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; size_t total_count = 0; size_t total_byte_size = 0; size_t valid_batches = 0; auto stats = default_cf_internal_stats_; - if (writer.status.ok()) { + if (writer->status.ok()) { SequenceNumber next_sequence = current_sequence; for (auto w : wal_write_group) { if (w->CheckCallback(this)) { @@ -126,7 +198,7 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, total_byte_size, WriteBatchInternal::ByteSize(w->batches)); } } - if (writer.disable_wal) { + if (writer->disable_wal) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } write_thread_.UpdateLastSequence(current_sequence + total_count - 1); @@ -147,28 +219,28 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - writer.status = + writer->status = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, need_log_dir_sync, current_sequence); } } - if (!writer.CallbackFailed()) { - WriteStatusCheck(writer.status); + if (!writer->CallbackFailed()) { + WriteStatusCheck(writer->status); } if (need_log_sync) { mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, writer.status); + MarkLogsSynced(logfile_number_, need_log_dir_sync, writer->status); mutex_.Unlock(); } - write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status); + write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer->status); } bool is_leader_thread = false; WriteThread::WriteGroup memtable_write_group; - if (writer.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + if (writer->state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_GUARD(write_memtable_time); - assert(writer.ShouldWriteToMemtable()); - write_thread_.EnterAsMemTableWriter(&writer, &memtable_write_group); + assert(writer->ShouldWriteToMemtable()); + write_thread_.EnterAsMemTableWriter(writer, &memtable_write_group); assert(immutable_db_options_.allow_concurrent_memtable_write); if (memtable_write_group.size > 1) { is_leader_thread = true; @@ -192,19 +264,19 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, } MemTableInsertStatusCheck(memtable_write_group.status); versions_->SetLastSequence(memtable_write_group.last_sequence); - write_thread_.ExitAsMemTableWriter(&writer, memtable_write_group); + write_thread_.ExitAsMemTableWriter(writer, memtable_write_group); } } - if (writer.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { - assert(writer.ShouldWriteToMemtable()); + if (writer->state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(writer->ShouldWriteToMemtable()); auto version_set = versions_->GetColumnFamilySet(); WriteBatchInternal::AsyncInsertInto( - &writer, writer.sequence, version_set, &flush_scheduler_, + writer, writer->sequence, version_set, &flush_scheduler_, ignore_missing_faimly, this, &write_thread_.write_queue_); // Because `LaunchParallelMemTableWriters` has add `write_group->size` to `running`, // the value of `running` is always larger than one if the leader thread does not // call `CompleteParallelMemTableWriter`. - while (writer.write_group->running.load(std::memory_order_acquire) > 1) { + while (writer->write_group->running.load(std::memory_order_acquire) > 1) { // Write thread could exit and block itself if it is not a leader thread. if (!write_thread_.write_queue_.RunFunc() && !is_leader_thread) { break; @@ -215,22 +287,21 @@ Status DBImpl::MultiBatchWriteImpl(const WriteOptions& write_options, // and it would not notify the threads in this WriteGroup. So we must make someone in // this WriteGroup to complete it and leader thread is easy to be decided. if (is_leader_thread) { - if (!write_thread_.CompleteParallelMemTableWriter(&writer)) { - return Status::Aborted("Leader thread must complete at last and exit as memtable writer."); + if (!write_thread_.CompleteParallelMemTableWriter(writer)) { + return Status::Aborted( + "Leader thread must complete at last and exit as memtable " + "writer->"); } - MemTableInsertStatusCheck(writer.status); - versions_->SetLastSequence(writer.write_group->last_sequence); - write_thread_.ExitAsMemTableWriter(&writer, *writer.write_group); + MemTableInsertStatusCheck(writer->status); + versions_->SetLastSequence(writer->write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(writer, *writer->write_group); } else { - write_thread_.CompleteParallelMemTableWriter(&writer); + write_thread_.CompleteParallelMemTableWriter(writer); } } - if (seq_used != nullptr) { - *seq_used = writer.sequence; - } - assert(writer.state == WriteThread::STATE_COMPLETED); - return writer.status; + assert(writer->state == WriteThread::STATE_COMPLETED); + return writer->status; } // The main write queue. This is the only write queue that updates LastSequence. @@ -286,10 +357,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and // they don't consume sequence. - return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch, - callback, log_used, log_ref, seq_used, batch_cnt, - pre_release_callback, assign_order, - kDontPublishLastSeq, disable_memtable); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, batch_cnt, pre_release_callback); + write_thread_.JoinBatchGroupNoBlocking(&w); + write_thread_.AwaitStateForGroupLeader(&w); + return WriteImplWALOnly(&nonmem_write_thread_, write_options, &w, log_used, + seq_used, assign_order, kDontPublishLastSeq); } if (immutable_db_options_.unordered_write) { @@ -300,10 +373,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, uint64_t seq; // Use a write thread to i) optimize for WAL write, ii) publish last // sequence in in increasing order, iii) call pre_release_callback serially - status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback, - log_used, log_ref, &seq, sub_batch_cnt, - pre_release_callback, kDoAssignOrder, - kDoPublishLastSeq, disable_memtable); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, batch_cnt, pre_release_callback); + write_thread_.JoinBatchGroupNoBlocking(&w); + write_thread_.AwaitStateForGroupLeader(&w); + status = WriteImplWALOnly(&write_thread_, write_options, &w, log_used, &seq, + kDoAssignOrder, kDoPublishLastSeq); TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); if (!status.ok()) { return status; @@ -322,13 +397,24 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (immutable_db_options_.enable_multi_thread_write) { std::vector updates(1); updates[0] = my_batch; - return MultiBatchWriteImpl(write_options, std::move(updates), callback, - log_used, log_ref, seq_used); + WriteThread::Writer w(write_options, std::move(updates), callback, log_ref); + write_thread_.JoinBatchGroupNoBlocking(&w); + status = MultiBatchWriteImpl(write_options, &w, log_used); + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + return status; } if (immutable_db_options_.enable_pipelined_write) { - return PipelinedWriteImpl(write_options, my_batch, callback, log_used, - log_ref, disable_memtable, seq_used); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable); + write_thread_.JoinBatchGroupNoBlocking(&w); + status = PipelinedWriteImpl(write_options, &w, log_used); + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + return status; } PERF_TIMER_GUARD(write_pre_and_post_process_time); @@ -341,7 +427,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - write_thread_.JoinBatchGroup(&w); + write_thread_.JoinBatchGroupNoBlocking(&w); + write_thread_.AwaitStateForGroupLeader(&w); if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { // we are a non-leader in a parallel group @@ -628,20 +715,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, - WriteBatch* my_batch, WriteCallback* callback, - uint64_t* log_used, uint64_t log_ref, - bool disable_memtable, uint64_t* seq_used) { + WriteThread::Writer* w, uint64_t* log_used) { PERF_TIMER_GUARD(write_pre_and_post_process_time); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); WriteContext write_context; - - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable); - write_thread_.JoinBatchGroup(&w); - if (w.state == WriteThread::STATE_GROUP_LEADER) { + write_thread_.AwaitStateForGroupLeader(w); + if (w->state == WriteThread::STATE_GROUP_LEADER) { WriteThread::WriteGroup wal_write_group; - if (w.callback && !w.callback->AllowWriteBatching()) { + if (w->callback && !w->callback->AllowWriteBatching()) { write_thread_.WaitForMemTableWriters(); } mutex_.Lock(); @@ -649,20 +731,20 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, bool need_log_dir_sync = need_log_sync && !log_dir_synced_; // PreprocessWrite does its own perf timing. PERF_TIMER_STOP(write_pre_and_post_process_time); - w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); + w->status = PreprocessWrite(write_options, &need_log_sync, &write_context); PERF_TIMER_START(write_pre_and_post_process_time); log::Writer* log_writer = logs_.back().writer; mutex_.Unlock(); // This can set non-OK status if callback fail. last_batch_group_size_ = - write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); + write_thread_.EnterAsBatchGroupLeader(w, &wal_write_group); const SequenceNumber current_sequence = write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; size_t total_count = 0; size_t total_byte_size = 0; - if (w.status.ok()) { + if (w->status.ok()) { SequenceNumber next_sequence = current_sequence; for (auto writer : wal_write_group) { if (writer->CheckCallback(this)) { @@ -676,7 +758,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); } } - if (w.disable_wal) { + if (w->disable_wal) { has_unpersisted_data_.store(true, std::memory_order_relaxed); } write_thread_.UpdateLastSequence(current_sequence + total_count - 1); @@ -691,7 +773,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); - if (w.status.ok() && !write_options.disableWAL) { + if (w->status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); RecordTick(stats_, WRITE_DONE_BY_SELF, 1); @@ -700,62 +782,60 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - w.status = WriteToWAL(wal_write_group, log_writer, log_used, - need_log_sync, need_log_dir_sync, current_sequence); + w->status = + WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, current_sequence); } - if (!w.CallbackFailed()) { - WriteStatusCheck(w.status); + if (!w->CallbackFailed()) { + WriteStatusCheck(w->status); } if (need_log_sync) { mutex_.Lock(); - MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status); + MarkLogsSynced(logfile_number_, need_log_dir_sync, w->status); mutex_.Unlock(); } - write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); + write_thread_.ExitAsBatchGroupLeader(wal_write_group, w->status); } WriteThread::WriteGroup memtable_write_group; - if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + if (w->state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { PERF_TIMER_GUARD(write_memtable_time); - assert(w.ShouldWriteToMemtable()); - write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); + assert(w->ShouldWriteToMemtable()); + write_thread_.EnterAsMemTableWriter(w, &memtable_write_group); if (memtable_write_group.size > 1 && immutable_db_options_.allow_concurrent_memtable_write) { write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); } else { memtable_write_group.status = WriteBatchInternal::InsertInto( - memtable_write_group, w.sequence, column_family_memtables_.get(), + memtable_write_group, w->sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_); versions_->SetLastSequence(memtable_write_group.last_sequence); - write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); + write_thread_.ExitAsMemTableWriter(w, memtable_write_group); } } - if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { - assert(w.ShouldWriteToMemtable()); + if (w->state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(w->ShouldWriteToMemtable()); ColumnFamilyMemTablesImpl column_family_memtables( versions_->GetColumnFamilySet()); - w.status = WriteBatchInternal::InsertInto( - &w, w.sequence, &column_family_memtables, &flush_scheduler_, + w->status = WriteBatchInternal::InsertInto( + w, w->sequence, &column_family_memtables, &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); - if (write_thread_.CompleteParallelMemTableWriter(&w)) { - MemTableInsertStatusCheck(w.status); - versions_->SetLastSequence(w.write_group->last_sequence); - write_thread_.ExitAsMemTableWriter(&w, *w.write_group); + if (write_thread_.CompleteParallelMemTableWriter(w)) { + MemTableInsertStatusCheck(w->status); + versions_->SetLastSequence(w->write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(w, *w->write_group); } } - if (seq_used != nullptr) { - *seq_used = w.sequence; - } - assert(w.state == WriteThread::STATE_COMPLETED); - return w.FinalStatus(); + assert(w->state == WriteThread::STATE_COMPLETED); + return w->FinalStatus(); } Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, @@ -808,32 +888,29 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, // The 2nd write queue. If enabled it will be used only for WAL-only writes. // This is the only queue that updates LastPublishedSequence which is only // applicable in a two-queue setting. -Status DBImpl::WriteImplWALOnly( - WriteThread* write_thread, const WriteOptions& write_options, - WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, - const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, - PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, - const PublishLastSeq publish_last_seq, const bool disable_memtable) { +Status DBImpl::WriteImplWALOnly(WriteThread* write_thread, + const WriteOptions& write_options, + WriteThread::Writer* w, uint64_t* log_used, + uint64_t* seq_used, + const AssignOrder assign_order, + const PublishLastSeq publish_last_seq) { Status status; PERF_TIMER_GUARD(write_pre_and_post_process_time); - WriteThread::Writer w(write_options, my_batch, callback, log_ref, - disable_memtable, sub_batch_cnt, pre_release_callback); RecordTick(stats_, WRITE_WITH_WAL); StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); - write_thread->JoinBatchGroup(&w); - assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); - if (w.state == WriteThread::STATE_COMPLETED) { + assert(w->state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); + if (w->state == WriteThread::STATE_COMPLETED) { if (log_used != nullptr) { - *log_used = w.log_used; + *log_used = w->log_used; } if (seq_used != nullptr) { - *seq_used = w.sequence; + *seq_used = w->sequence; } - return w.FinalStatus(); + return w->FinalStatus(); } // else we are the leader of the write batch group - assert(w.state == WriteThread::STATE_GROUP_LEADER); + assert(w->state == WriteThread::STATE_GROUP_LEADER); if (publish_last_seq == kDoPublishLastSeq) { // Currently we only use kDoPublishLastSeq in unordered_write @@ -852,7 +929,7 @@ Status DBImpl::WriteImplWALOnly( } if (!status.ok()) { WriteThread::WriteGroup write_group; - write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->EnterAsBatchGroupLeader(w, &write_group); write_thread->ExitAsBatchGroupLeader(write_group, status); return status; } @@ -860,7 +937,7 @@ Status DBImpl::WriteImplWALOnly( WriteThread::WriteGroup write_group; uint64_t last_sequence; - write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->EnterAsBatchGroupLeader(w, &write_group); // Note: no need to update last_batch_group_size_ here since the batch writes // to WAL only @@ -948,7 +1025,7 @@ Status DBImpl::WriteImplWALOnly( } PERF_TIMER_START(write_pre_and_post_process_time); - if (!w.CallbackFailed()) { + if (!w->CallbackFailed()) { WriteStatusCheck(status); } if (status.ok()) { @@ -957,8 +1034,8 @@ Status DBImpl::WriteImplWALOnly( if (!writer->CallbackFailed() && writer->pre_release_callback) { assert(writer->sequence != kMaxSequenceNumber); Status ws = writer->pre_release_callback->Callback( - writer->sequence, disable_memtable, writer->log_used, index++, - pre_release_callback_cnt); + writer->sequence, writer->disable_memtable, writer->log_used, + index++, pre_release_callback_cnt); if (!ws.ok()) { status = ws; break; @@ -976,10 +1053,10 @@ Status DBImpl::WriteImplWALOnly( } write_thread->ExitAsBatchGroupLeader(write_group, status); if (status.ok()) { - status = w.FinalStatus(); + status = w->FinalStatus(); } if (seq_used != nullptr) { - *seq_used = w.sequence; + *seq_used = w->sequence; } return status; } diff --git a/db/db_test.cc b/db/db_test.cc index 16ac9f79173..7ef6774429c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2422,6 +2422,8 @@ namespace { typedef std::map KVMap; } +struct DBWriter; + class ModelDB : public DB { public: class ModelSnapshot : public Snapshot { @@ -2581,6 +2583,12 @@ class ModelDB : public DB { handler.map_ = &map_; return batch->Iterate(&handler); } + using DB::Prepare; + void Prepare(const WriteOptions& options, DBWriter* writer) override {} + using DB::Submit; + Status Submit(const WriteOptions& options, DBWriter* writer) override { + return Status::NotSupported("Not implemented"); + } using DB::GetProperty; bool GetProperty(ColumnFamilyHandle* /*column_family*/, diff --git a/db/write_thread.cc b/db/write_thread.cc index 6f3db234bd5..f9e31097bff 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -422,6 +422,22 @@ void WriteThread::JoinBatchGroup(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } } +void WriteThread::JoinBatchGroupNoBlocking(Writer* w) { + assert(!w->batches.empty()); + bool linked_as_leader = LinkOne(w, &newest_writer_); + if (linked_as_leader) { + SetState(w, STATE_GROUP_LEADER); + } +} + +void WriteThread::AwaitStateForGroupLeader(Writer* w) { + if (w->state.load(std::memory_order_relaxed) != STATE_GROUP_LEADER) { + AwaitState(w, + STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, + &jbg_ctx); + } +} size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, WriteGroup* write_group) { diff --git a/db/write_thread.h b/db/write_thread.h index 8a5882dfb3d..17f75872b02 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -302,6 +302,8 @@ class WriteThread { // // Writer* w: Writer to be executed as part of a batch group void JoinBatchGroup(Writer* w); + void JoinBatchGroupNoBlocking(Writer* w); + void AwaitStateForGroupLeader(Writer* w); // Constructs a write batch group led by leader, which should be a // Writer passed to JoinBatchGroup on the current thread. diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index cfdf059f648..b0204787ae3 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -54,6 +54,7 @@ class Env; class EventListener; class StatsHistoryIterator; class TraceWriter; +struct DBWriter; #ifdef ROCKSDB_LITE class CompactionJobInfo; #endif @@ -378,6 +379,8 @@ class DB { // Returns OK on success, non-OK on failure. // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; + virtual void Prepare(const WriteOptions& options, DBWriter* writer) = 0; + virtual Status Submit(const WriteOptions& options, DBWriter* writer) = 0; virtual Status MultiBatchWrite(const WriteOptions& /*options*/, std::vector&& /*updates*/) { diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index 67bf4e2fa6b..a37bc2e4c36 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -15,6 +15,7 @@ #endif namespace rocksdb { +struct DBWriter; // This class contains APIs to stack rocksdb wrappers.Eg. Stack TTL over base d class StackableDB : public DB { @@ -164,6 +165,12 @@ class StackableDB : public DB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override { return db_->Write(opts, updates); } + virtual void Prepare(const WriteOptions& options, DBWriter* writer) override { + } + virtual Status Submit(const WriteOptions& options, + DBWriter* writer) override { + return Status::NotSupported("Not implemented"); + } using DB::NewIterator; virtual Iterator* NewIterator(const ReadOptions& opts, diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 0a22c0acd9b..6c3929d591c 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -37,6 +37,7 @@ namespace rocksdb { class DBImpl; +struct DBWriter; class ColumnFamilyHandle; class ColumnFamilyData; struct FlushJobInfo; @@ -137,6 +138,12 @@ class BlobDBImpl : public BlobDB { std::vector* values) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + virtual void Prepare(const WriteOptions& options, DBWriter* writer) override { + } + virtual Status Submit(const WriteOptions& options, + DBWriter* writer) override { + return Status::NotSupported("Not implemented"); + } virtual Status Close() override; From f4c68d1ccebf4c1e4b0e984888c982869f57da13 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 19 Oct 2021 17:12:57 +0800 Subject: [PATCH 2/5] remove JoinBatchGroup Signed-off-by: Little-Wallace --- db/write_thread.cc | 27 +++++---------------------- db/write_thread.h | 1 - 2 files changed, 5 insertions(+), 23 deletions(-) diff --git a/db/write_thread.cc b/db/write_thread.cc index f9e31097bff..a375e14a06e 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -389,19 +389,18 @@ void WriteThread::EndWriteStall() { } static WriteThread::AdaptationContext jbg_ctx("JoinBatchGroup"); -void WriteThread::JoinBatchGroup(Writer* w) { +void WriteThread::JoinBatchGroupNoBlocking(Writer* w) { TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Start", w); assert(!w->batches.empty()); - bool linked_as_leader = LinkOne(w, &newest_writer_); - if (linked_as_leader) { SetState(w, STATE_GROUP_LEADER); } - TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); +} - if (!linked_as_leader) { +void WriteThread::AwaitStateForGroupLeader(Writer* w) { + if (w->state.load(std::memory_order_acquire) != STATE_GROUP_LEADER) { /** * Wait util: * 1) An existing leader pick us as the new leader when it finishes @@ -415,27 +414,11 @@ void WriteThread::JoinBatchGroup(Writer* w) { * 3.2) an existing memtable writer group leader tell us to finish memtable * writes in parallel. */ - TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); - AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | - STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, - &jbg_ctx); - TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); - } -} -void WriteThread::JoinBatchGroupNoBlocking(Writer* w) { - assert(!w->batches.empty()); - bool linked_as_leader = LinkOne(w, &newest_writer_); - if (linked_as_leader) { - SetState(w, STATE_GROUP_LEADER); - } -} - -void WriteThread::AwaitStateForGroupLeader(Writer* w) { - if (w->state.load(std::memory_order_relaxed) != STATE_GROUP_LEADER) { AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &jbg_ctx); + TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } } diff --git a/db/write_thread.h b/db/write_thread.h index 17f75872b02..627ce7ab612 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -301,7 +301,6 @@ class WriteThread { // it will block. // // Writer* w: Writer to be executed as part of a batch group - void JoinBatchGroup(Writer* w); void JoinBatchGroupNoBlocking(Writer* w); void AwaitStateForGroupLeader(Writer* w); From 0079de90fc91427b8b62ffebc0d841b611703230 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 19 Oct 2021 17:50:10 +0800 Subject: [PATCH 3/5] rename variable Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 4 ++-- db/db_impl/db_impl_write.cc | 25 +++++++++++++------------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 65fdf3279a9..1e19ae4aa94 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -112,8 +112,8 @@ class Directories { struct DBWriter { DBWriter(const WriteOptions& options, std::vector&& updates) - : w(options, std::move(updates), nullptr, 0) {} - WriteThread::Writer w; + : writer(options, std::move(updates), nullptr, 0) {} + WriteThread::Writer writer; }; // While DB is the public interface of RocksDB, and DBImpl is the actual diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 92f057958ec..25b4c074b8b 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -59,17 +59,17 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { } void DBImpl::Prepare(const WriteOptions& write_options, DBWriter* writer) { - write_thread_.JoinBatchGroupNoBlocking(&writer->w); + write_thread_.JoinBatchGroupNoBlocking(&writer->writer); } Status DBImpl::Submit(const WriteOptions& write_options, DBWriter* writer) { - if (writer == nullptr || writer->w.batches.empty()) { + if (writer == nullptr || writer->writer.batches.empty()) { return Status::Corruption("Batch is nullptr!"); } if (tracer_) { InstrumentedMutexLock lock(&trace_mutex_); if (tracer_) { - tracer_->Write(writer->w.batches[0]); + tracer_->Write(writer->writer.batches[0]); } } if (write_options.sync && write_options.disableWAL) { @@ -92,7 +92,8 @@ Status DBImpl::Submit(const WriteOptions& write_options, DBWriter* writer) { Status status; if (write_options.low_pri) { - status = ThrottleLowPriWritesIfNeeded(write_options, writer->w.batches[0]); + status = + ThrottleLowPriWritesIfNeeded(write_options, writer->writer.batches[0]); if (!status.ok()) { return status; } @@ -105,30 +106,30 @@ Status DBImpl::Submit(const WriteOptions& write_options, DBWriter* writer) { if (immutable_db_options_.unordered_write) { const size_t sub_batch_cnt = - WriteBatchInternal::Count(writer->w.batches[0]); + WriteBatchInternal::Count(writer->writer.batches[0]); uint64_t seq; - write_thread_.AwaitStateForGroupLeader(&writer->w); - status = WriteImplWALOnly(&write_thread_, write_options, &writer->w, + write_thread_.AwaitStateForGroupLeader(&writer->writer); + status = WriteImplWALOnly(&write_thread_, write_options, &writer->writer, nullptr, &seq, kDoAssignOrder, kDoPublishLastSeq); TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); if (!status.ok()) { return status; } TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); - status = UnorderedWriteMemtable(write_options, writer->w.batches[0], + status = UnorderedWriteMemtable(write_options, writer->writer.batches[0], nullptr, 0, seq, sub_batch_cnt); return status; } if (immutable_db_options_.enable_multi_thread_write) { - return MultiBatchWriteImpl(write_options, &writer->w, nullptr); + return MultiBatchWriteImpl(write_options, &writer->writer, nullptr); } if (immutable_db_options_.enable_pipelined_write) { - return PipelinedWriteImpl(write_options, &writer->w, nullptr); + return PipelinedWriteImpl(write_options, &writer->writer, nullptr); } - - return Status::OK(); + return Status::NotSupported( + "Do not support submit without pipelined_write or unordered_write"); } #ifndef ROCKSDB_LITE From c9a4a3c273d2f89f8128fc482ad6ac7ca2684423 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Wed, 20 Oct 2021 15:32:35 +0800 Subject: [PATCH 4/5] fix interface Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 2 +- db/db_impl/db_impl_write.cc | 2 +- db/db_test.cc | 2 +- include/rocksdb/db.h | 2 +- include/rocksdb/utilities/stackable_db.h | 3 +-- utilities/blob_db/blob_db_impl.h | 3 +-- 6 files changed, 6 insertions(+), 8 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 1e19ae4aa94..97bf129eb9c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -160,7 +160,7 @@ class DBImpl : public DB { virtual Status Write(const WriteOptions& options, WriteBatch* updates) override; using DB::Prepare; - virtual void Prepare(const WriteOptions& options, DBWriter* writer) override; + virtual void Prepare(DBWriter* writer) override; using DB::Submit; virtual Status Submit(const WriteOptions& options, DBWriter* writer) override; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 25b4c074b8b..3cda56a98c4 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -58,7 +58,7 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { return WriteImpl(write_options, my_batch, nullptr, nullptr); } -void DBImpl::Prepare(const WriteOptions& write_options, DBWriter* writer) { +void DBImpl::Prepare(DBWriter* writer) { write_thread_.JoinBatchGroupNoBlocking(&writer->writer); } diff --git a/db/db_test.cc b/db/db_test.cc index 7ef6774429c..01044f392d0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2584,7 +2584,7 @@ class ModelDB : public DB { return batch->Iterate(&handler); } using DB::Prepare; - void Prepare(const WriteOptions& options, DBWriter* writer) override {} + void Prepare(DBWriter* writer) override {} using DB::Submit; Status Submit(const WriteOptions& options, DBWriter* writer) override { return Status::NotSupported("Not implemented"); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b0204787ae3..d6e6818e04b 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -379,7 +379,7 @@ class DB { // Returns OK on success, non-OK on failure. // Note: consider setting options.sync = true. virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0; - virtual void Prepare(const WriteOptions& options, DBWriter* writer) = 0; + virtual void Prepare(DBWriter* writer) = 0; virtual Status Submit(const WriteOptions& options, DBWriter* writer) = 0; virtual Status MultiBatchWrite(const WriteOptions& /*options*/, diff --git a/include/rocksdb/utilities/stackable_db.h b/include/rocksdb/utilities/stackable_db.h index a37bc2e4c36..77e4baaada1 100644 --- a/include/rocksdb/utilities/stackable_db.h +++ b/include/rocksdb/utilities/stackable_db.h @@ -165,8 +165,7 @@ class StackableDB : public DB { virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override { return db_->Write(opts, updates); } - virtual void Prepare(const WriteOptions& options, DBWriter* writer) override { - } + virtual void Prepare(DBWriter* writer) override {} virtual Status Submit(const WriteOptions& options, DBWriter* writer) override { return Status::NotSupported("Not implemented"); diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h index 6c3929d591c..732d90ae2d4 100644 --- a/utilities/blob_db/blob_db_impl.h +++ b/utilities/blob_db/blob_db_impl.h @@ -138,8 +138,7 @@ class BlobDBImpl : public BlobDB { std::vector* values) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; - virtual void Prepare(const WriteOptions& options, DBWriter* writer) override { - } + virtual void Prepare(DBWriter* writer) override {} virtual Status Submit(const WriteOptions& options, DBWriter* writer) override { return Status::NotSupported("Not implemented"); From a36ae5762f93ada774a2d6c7d684e69b9b34f0c5 Mon Sep 17 00:00:00 2001 From: Little-Wallace Date: Tue, 26 Oct 2021 15:35:06 +0800 Subject: [PATCH 5/5] support is ready interface Signed-off-by: Little-Wallace --- db/db_impl/db_impl.h | 2 ++ db/write_thread.h | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 97bf129eb9c..10b4661f709 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -114,6 +114,8 @@ struct DBWriter { DBWriter(const WriteOptions& options, std::vector&& updates) : writer(options, std::move(updates), nullptr, 0) {} WriteThread::Writer writer; + + bool IsReady() const { return writer.IsReady(); } }; // While DB is the public interface of RocksDB, and DBImpl is the actual diff --git a/db/write_thread.h b/db/write_thread.h index 627ce7ab612..54c73f094b7 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -258,6 +258,12 @@ class WriteThread { return status.ok() && !CallbackFailed() && !disable_wal; } + bool IsReady() const { + static uint8_t goal = STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED; + return (state.load(std::memory_order_acquire) | goal) != 0; + } + // No other mutexes may be acquired while holding StateMutex(), it is // always last in the order std::mutex& StateMutex() {