Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Split Write operation for raft-concurrent-apply #260

Open
wants to merge 5 commits into
base: 6.4.tikv
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ class Directories {
std::unique_ptr<Directory> wal_dir_;
};

struct DBWriter {
DBWriter(const WriteOptions& options, std::vector<WriteBatch*>&& updates)
: writer(options, std::move(updates), nullptr, 0) {}
WriteThread::Writer writer;

bool IsReady() const { return writer.IsReady(); }
};

// While DB is the public interface of RocksDB, and DBImpl is the actual
// class implementing it. It's the entrance of the core RocksdB engine.
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
Expand Down Expand Up @@ -153,6 +161,10 @@ class DBImpl : public DB {
using DB::Write;
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;
using DB::Prepare;
virtual void Prepare(DBWriter* writer) override;
using DB::Submit;
virtual Status Submit(const WriteOptions& options, DBWriter* writer) override;

using DB::MultiBatchWrite;
virtual Status MultiBatchWrite(const WriteOptions& options,
Expand Down Expand Up @@ -1029,16 +1041,10 @@ class DBImpl : public DB {
PreReleaseCallback* pre_release_callback = nullptr);

Status MultiBatchWriteImpl(const WriteOptions& write_options,
std::vector<WriteBatch*>&& my_batch,
WriteCallback* callback,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
WriteThread::Writer* w, uint64_t* log_used);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false,
uint64_t* seq_used = nullptr);
Status PipelinedWriteImpl(const WriteOptions& write_options,
WriteThread::Writer* w, uint64_t* log_used);

// Write only to memtables without joining any write queue
Status UnorderedWriteMemtable(const WriteOptions& write_options,
Expand All @@ -1060,12 +1066,11 @@ class DBImpl : public DB {
// of the write batch that does not have duplicate keys. When seq_per_batch is
// not set, each key is a separate sub_batch. Otherwise each duplicate key
// marks start of a new sub-batch.
Status WriteImplWALOnly(
WriteThread* write_thread, const WriteOptions& options,
WriteBatch* updates, WriteCallback* callback, uint64_t* log_used,
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable);
Status WriteImplWALOnly(WriteThread* write_thread,
const WriteOptions& options, WriteThread::Writer* w,
uint64_t* log_used, uint64_t* seq_used,
const AssignOrder assign_order,
const PublishLastSeq publish_last_seq);

// write cached_recoverable_state_ to memtable if it is not empty
// The writer must be the leader in write_thread_ and holding mutex_
Expand Down
Loading