Skip to content

Commit

Permalink
MB-35458 [SR]: Move SyncWrite completion to bg DurabilityCompletionTask
Browse files Browse the repository at this point in the history
Change how SyncWrites which are Resolved and awaiting Completion are
handled, by moving the final VBucket::commit() / abort() into a
background task - DurabilityCompletionTask.

+Background+

There are two reasons for making this change:

a) Performance - specifically latency of front-end worker threads.

By moving completion into a background task, we reduce the amount of
work done on the thread which actually detected the SyncWrite was
resolved - typically the front-end DCP threads when a DCP_SEQNO_ACK
is processed.
Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK
could result in committing multiple SyncWrites. Committing one
SyncWrite is similar to a normal front-end Set operation, so there is
potentially a non-trivial amount of work needed to be done when
completing SyncWrites, which could tie up the front-end thread (causing
other Connections to have to wait) for a noticable amount of time.

b) Simplification of lock management.

Doing completion in a background task simplifies lock management, for
example we avoid lock inversions with earlier locks acquired during
dcpSeqnoAck when attemping to later call notifySeqnoAvailable when this
was done on the original thread.

+Problem+

While (a) was the first reason identified for making this change
(see MB-33092), (b) is the reason this change is being made now. During
testing the following lock-order-inversion was seen:

    WARNING: ThreadSanitizer: lock-order-inversion (potential deadlock)
    Cycle in lock order graph:

    Stream::streamMutex => StreamContainer::rwlock => Stream::streamMutex

The crux of the issue is the processing of DCP_SEQNO_ACKNOWLEDGED
messages by the DcpProducer - this acquires the Stream::streamMutex
before calling VBucket::seqnoAcknowledged(), however that function
currently results in VBucket::commit() being called to synchronously
complete the SyncWrite; which in turn must nodify all connected
replica that a new seqno is available, requiring
StreamContainer::rwlock to be acquired:

  Mutex StreamContainer::rwlock acquired here while holding mutex Stream::streamMutex in thread T15:
    ...
    couchbase#6 StreamContainer<std::shared_ptr<Stream> >::rlock()
    #7 DcpProducer::notifySeqnoAvailable(Vbid, unsigned long)
    ...
    #13 VBucket::commit(...)
    #14 ActiveDurabilityMonitor::commit(...)
    #15 ActiveDurabilityMonitor::processCompletedSyncWriteQueue()
    #16 ActiveDurabilityMonitor::seqnoAckReceived(...)
    #17 VBucket::seqnoAcknowledged(...)
    #18 ActiveStream::seqnoAck(...)
    #19 DcpProducer::seqno_acknowledged(...)
    ...

  Mutex Stream::streamMutex previously acquired by the same thread here:
    ...
    couchbase#3 std::lock_guard<std::mutex>::lock_guard(std::mutex&)
    couchbase#4 ActiveStream::seqnoAck(...)
    couchbase#5 DcpProducer::seqno_acknowledged(...)
    ...

This conflicts with the ordering seen when sending items out on the
DCP connection - inside DcpProducer::step() where the
StreamContainer::rwlock is acquired first, then ActiveStream::mutex
acquired later:

  Mutex Stream::streamMutex acquired here while holding mutex StreamContainer::rwlock in thread T15:
    ...
    couchbase#3 std::lock_guard<std::mutex>::lock_guard(std::mutex&)
    couchbase#4 ActiveStream::next()
    couchbase#5 DcpProducer::getNextItem()
    couchbase#6 DcpProducer::step(dcp_message_producers*)
    ...

  Mutex StreamContainer::rwlock previously acquired by the same thread here:
    #0 pthread_rwlock_rdlock <null> (libtsan.so.0+0x00000002c98b)
    ...
    couchbase#4 std::shared_lock<cb::RWLock>::shared_lock(cb::RWLock&)
    couchbase#5 StreamContainer<>::ResumableIterationHandle::ResumableIterationHandle()
    couchbase#6 StreamContainer<>::startResumable()
    #7 DcpProducer::getNextItem()
    #8 DcpProducer::step(dcp_message_producers*)
    ...

+Solution+

The processing of resolved SyncWrites moved into a new background task.
Instead of immediately processing them within
ActiveDM::seqnoAckReceived(), that function notifies the new NonIO
DurabilityCompletionTask that there are SyncWrites waiting for
completion.

DurabilityCompletionTask maintains a bool per vBucket indicating if
there are SyncWrites for that vBucket pending completion. When the
task is run, for each flag which is true it calls
VBucket::processResolvedSyncWrites() for the associated VBucket.

+Implementaiton Notes+

Currently there is just a single DurabilityCompletionTask (per Bucket),
this was chosen as 1 task per vBucket (i.e. 1024 per Bucket) would
be inefficient for our current background task scheduler (both in terms
of latency to schedule each task for only one vBucket's worth of work,
and in terms of managing that many tasks in the future queue).

However, that does _potentially_ mean there's fewer resources (threads)
available to complete SyncWrites on - previously that work could be
done concurrently on all frontend threads (~O(num_cpus). Now the same
work only has 1 thread available to run on (there's only a single
DurabilityCompletionTask).

_If_ this becomes a bottleneck we could look at increasing the number of
DurabilityCompletionTask - e.g. sharding all vBuckets across multiple
tasks like flusher / bgfetcher.

Change-Id: I87897af1e3fd0a57e5abc2cb1ba9f795a9d3c63e
Reviewed-on: http://review.couchbase.org/113141
Tested-by: Build Bot <[email protected]>
Reviewed-by: Ben Huddleston <[email protected]>
  • Loading branch information
daverigby committed Aug 19, 2019
1 parent 96ed3eb commit 7fdb98a
Show file tree
Hide file tree
Showing 35 changed files with 455 additions and 75 deletions.
1 change: 1 addition & 0 deletions engines/ep/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ ADD_LIBRARY(ep_objs OBJECT
src/defragmenter_visitor.cc
src/diskdockey.cc
src/durability/active_durability_monitor.cc
src/durability/durability_completion_task.cc
src/durability/durability_monitor.cc
src/durability/durability_monitor_impl.cc
src/durability/passive_durability_monitor.cc
Expand Down
1 change: 1 addition & 0 deletions engines/ep/benchmarks/defragmenter_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class DefragmentBench : public benchmark::Fixture {
/*table*/ nullptr,
std::make_shared<DummyCB>(),
/*newSeqnoCb*/ nullptr,
[](Vbid) { return; },
NoopSyncWriteCompleteCb,
NoopSeqnoAckCb,
config,
Expand Down
1 change: 1 addition & 0 deletions engines/ep/benchmarks/item_compressor_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class ItemCompressorBench : public benchmark::Fixture {
/*table*/ nullptr,
std::make_shared<DummyCB>(),
/*newSeqnoCb*/ nullptr,
[](Vbid) { return; },
NoopSyncWriteCompleteCb,
NoopSeqnoAckCb,
config,
Expand Down
22 changes: 13 additions & 9 deletions engines/ep/src/durability/active_durability_monitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ void ActiveDurabilityMonitor::setReplicationTopology(
s->setReplicationTopology(topology, *resolvedQueue);
}

processCompletedSyncWriteQueue();
checkForResolvedSyncWrites();
}

int64_t ActiveDurabilityMonitor::getHighPreparedSeqno() const {
Expand Down Expand Up @@ -619,8 +619,9 @@ ENGINE_ERROR_CODE ActiveDurabilityMonitor::seqnoAckReceived(
seqnoAckReceivedPostProcessHook();
}

// Process the Completed Queue, committing all items and removing them.
processCompletedSyncWriteQueue();
// Check if any there's now any resolved SyncWrites which should be
// completed.
checkForResolvedSyncWrites();

return ENGINE_SUCCESS;
}
Expand All @@ -639,7 +640,7 @@ void ActiveDurabilityMonitor::processTimeout(
// the correct locks).
state.wlock()->removeExpired(asOf, *resolvedQueue);

processCompletedSyncWriteQueue();
checkForResolvedSyncWrites();
}

void ActiveDurabilityMonitor::notifyLocalPersistence() {
Expand Down Expand Up @@ -728,6 +729,13 @@ void ActiveDurabilityMonitor::addStatsForChain(
}
}

void ActiveDurabilityMonitor::checkForResolvedSyncWrites() {
if (resolvedQueue->empty()) {
return;
}
vb.notifySyncWritesPendingCompletion();
}

void ActiveDurabilityMonitor::processCompletedSyncWriteQueue() {
std::lock_guard<ResolvedQueue::ConsumerLock> lock(
resolvedQueue->getConsumerLock());
Expand Down Expand Up @@ -1645,11 +1653,7 @@ void ActiveDurabilityMonitor::checkForCommit() {
// the resolvedQueue (under the correct locks).
state.wlock()->updateHighPreparedSeqno(*resolvedQueue);

// @todo: Consider to commit in a dedicated function for minimizing
// contention on front-end threads, as this function is supposed to
// execute under VBucket-level lock.

processCompletedSyncWriteQueue();
checkForResolvedSyncWrites();
}

template <class exception>
Expand Down
12 changes: 9 additions & 3 deletions engines/ep/src/durability/active_durability_monitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
*/
void removedQueuedAck(const std::string& node);

/**
* For all items in the completedSWQueue, call VBucket::commit /
* VBucket::abort as appropriate, then remove the item from the queue.
*/
void processCompletedSyncWriteQueue();

/**
* @return all of the currently tracked writes
*/
Expand Down Expand Up @@ -363,10 +369,10 @@ class ActiveDurabilityMonitor : public DurabilityMonitor {
const ReplicationChain& chain) const;

/**
* For all items in the completedSWQueue, call VBucket::commit /
* VBucket::abort as appropriate, then remove the item from the queue.
* Checks if the resolvedQueue contains any SyncWrites awaiting completion,
* and if so notifies the VBucket.
*/
void processCompletedSyncWriteQueue();
void checkForResolvedSyncWrites();

// The stats object for the owning Bucket
EPStats& stats;
Expand Down
92 changes: 92 additions & 0 deletions engines/ep/src/durability/durability_completion_task.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2019 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "durability_completion_task.h"

#include "ep_engine.h"
#include "executorpool.h"
#include "vbucket.h"

#include <climits>

using namespace std::chrono_literals;

DurabilityCompletionTask::DurabilityCompletionTask(
EventuallyPersistentEngine& engine)
: GlobalTask(&engine, TaskId::DurabilityCompletionTask),
pendingVBs(engine.getConfiguration().getMaxVbuckets()),
lastVb(pendingVBs.size() - 1) {
for (auto& vb : pendingVBs) {
vb.store(false);
}
}

bool DurabilityCompletionTask::run() {
if (engine->getEpStats().isShutdown) {
return false;
}

// Start by putting ourselves back to sleep once run() completes.
// If a new VB is notified (or a VB is re-notified after it is processed in
// the loop below) then that will cause the task to be re-awoken.
snooze(INT_MAX);
// Clear the wakeUpScheduled flag - that allows notifySyncWritesToComplete()
// to wake up (re-schedule) this task if new vBuckets have SyncWrites which
// need completing.
wakeUpScheduled.store(false);

const auto startTime = std::chrono::steady_clock::now();

// Loop for each vBucket, starting from where we previously left off.
int vbid = (lastVb + 1) % pendingVBs.size();
// For each vbucket, if the pending flag is set then clear it process
// its' resolved SyncWrites.
for (; vbid != lastVb; vbid = (vbid + 1) % pendingVBs.size()) {
if (pendingVBs[vbid].exchange(false)) {
engine->getVBucket(Vbid(vbid))->processResolvedSyncWrites();
}
// Yield back to scheduler if we have exceeded the maximum runtime
// for a single execution.
auto runtime = std::chrono::steady_clock::now() - startTime;
if (runtime > maxChunkDuration) {
wakeUp();
break;
}
}
lastVb = vbid;

return true;
}

void DurabilityCompletionTask::notifySyncWritesToComplete(Vbid vbid) {
bool expected = false;
if (!pendingVBs[vbid.get()].compare_exchange_strong(expected, true)) {
// This VBucket transitioned from false -> true - wake ourselves up so
// we can start to process the SyncWrites.
expected = false;

// Performance: Only wake up the task once (and don't repeatedly try to
// wake if it's already scheduled to wake) - ExecutorPool::wake() isn't
// super cheap so avoid it if already pending.
if (wakeUpScheduled.compare_exchange_strong(expected, true)) {
ExecutorPool::get()->wake(getId());
}
}
}

const std::chrono::steady_clock::duration
DurabilityCompletionTask::maxChunkDuration = 25ms;
85 changes: 85 additions & 0 deletions engines/ep/src/durability/durability_completion_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2019 Couchbase, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "globaltask.h"
#include <memcached/vbucket.h>

/*
* This task is used to complete (commit or abort) all SyncWrites which have
* been resolved by each vbucket's ActiveDM.
*
* This is done in a separate task to reduce the amount of work done on
* the thread which actually detected the SyncWrite was resolved - typically
* the front-end DCP threads when a DCP_SEQNO_ACK is processed.
* Given that we SEQNO_ACK at the end of Snapshot, A single SEQNO_ACK could
* result in committing multiple SyncWrites, and Committing one SyncWrite is
* similar to a normal front-end Set operation, we want to move this to a
* background task.
*
* Additionally, by doing this in a background task it simplifies lock
* management, for example we avoid lock inversions with earlier locks acquired
* during dcpSeqnoAck when attemping to later call notifySeqnoAvailable when
* this was done on the original thread.
*/
class DurabilityCompletionTask : public GlobalTask {
public:
DurabilityCompletionTask(EventuallyPersistentEngine& engine);

bool run() override;

std::string getDescription() override {
return "DurabilityCompletionTask";
}

std::chrono::microseconds maxExpectedDuration() override {
// Task shouldn't run much longer than maxChunkDuration; given we yield
// after that duration - however _could_ exceed a bit given we check
// the duration on each vBucket. As such add a 2x margin of error.
return std::chrono::duration_cast<std::chrono::microseconds>(
2 * maxChunkDuration);
}

/**
* Notifies the task that the given vBucket has SyncWrite(s) ready to
* be completed.
* If the given vBucket isn't already pending, then will wake up the task
* for it to run.
*/
void notifySyncWritesToComplete(Vbid vbid);

private:
/**
* A flag for each (possible) Vbid, set to true if there are SyncWrites
* which need to be resolved.
*/
std::vector<std::atomic_bool> pendingVBs;

/// The last vBucket which SyncWrites were completed for, and the vBucket
/// the next run() method will continue from.
int lastVb;

/**
* Flag which is used to check if a wakeup has already been schedueled for
* this task.
*/
std::atomic<bool> wakeUpScheduled{false};

/// Maximum duration this task should execute for before yielding back to
/// the ExecutorPool (to allow other tasks to run).
static const std::chrono::steady_clock::duration maxChunkDuration;
};
1 change: 1 addition & 0 deletions engines/ep/src/ep_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ VBucketPtr EPBucket::makeVBucket(
std::move(table),
flusherCb,
std::move(newSeqnoCb),
makeSyncWriteResolvedCB(),
makeSyncWriteCompleteCB(),
makeSeqnoAckCB(),
engine.getConfiguration(),
Expand Down
2 changes: 2 additions & 0 deletions engines/ep/src/ep_vb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ EPVBucket::EPVBucket(Vbid i,
std::unique_ptr<FailoverTable> table,
std::shared_ptr<Callback<Vbid>> flusherCb,
NewSeqnoCallback newSeqnoCb,
SyncWriteResolvedCallback syncWriteResolvedCb,
SyncWriteCompleteCallback syncWriteCb,
SeqnoAckCallback seqnoAckCb,
Configuration& config,
Expand All @@ -69,6 +70,7 @@ EPVBucket::EPVBucket(Vbid i,
flusherCb,
std::make_unique<StoredValueFactory>(st),
std::move(newSeqnoCb),
syncWriteResolvedCb,
syncWriteCb,
seqnoAckCb,
config,
Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/ep_vb.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class EPVBucket : public VBucket {
std::unique_ptr<FailoverTable> table,
std::shared_ptr<Callback<Vbid>> flusherCb,
NewSeqnoCallback newSeqnoCb,
SyncWriteResolvedCallback syncWriteResolvedCb,
SyncWriteCompleteCallback syncWriteCb,
SeqnoAckCallback seqnoAckCb,
Configuration& config,
Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/ephemeral_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ VBucketPtr EphemeralBucket::makeVBucket(
lastSnapEnd,
std::move(table),
std::move(newSeqnoCb),
makeSyncWriteResolvedCB(),
makeSyncWriteCompleteCB(),
makeSeqnoAckCB(),
engine.getConfiguration(),
Expand Down
2 changes: 2 additions & 0 deletions engines/ep/src/ephemeral_vb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ EphemeralVBucket::EphemeralVBucket(
uint64_t lastSnapEnd,
std::unique_ptr<FailoverTable> table,
NewSeqnoCallback newSeqnoCb,
SyncWriteResolvedCallback syncWriteResolvedCb,
SyncWriteCompleteCallback syncWriteCb,
SeqnoAckCallback seqnoAckCb,
Configuration& config,
Expand All @@ -65,6 +66,7 @@ EphemeralVBucket::EphemeralVBucket(
/*flusherCb*/ nullptr,
std::make_unique<OrderedStoredValueFactory>(st),
std::move(newSeqnoCb),
syncWriteResolvedCb,
syncWriteCb,
seqnoAckCb,
config,
Expand Down
1 change: 1 addition & 0 deletions engines/ep/src/ephemeral_vb.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class EphemeralVBucket : public VBucket {
uint64_t lastSnapEnd,
std::unique_ptr<FailoverTable> table,
NewSeqnoCallback newSeqnoCb,
SyncWriteResolvedCallback syncWriteResolvedCb,
SyncWriteCompleteCallback syncWriteCb,
SeqnoAckCallback seqnoAckCb,
Configuration& config,
Expand Down
13 changes: 13 additions & 0 deletions engines/ep/src/kv_bucket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "connmap.h"
#include "dcp/dcpconnmap.h"
#include "defragmenter.h"
#include "durability/durability_completion_task.h"
#include "durability_timeout_task.h"
#include "ep_engine.h"
#include "ep_time.h"
Expand Down Expand Up @@ -449,6 +450,10 @@ bool KVBucket::initialize() {
config.getDurabilityTimeoutTaskInterval()));
ExecutorPool::get()->schedule(durabilityTimeoutTask);

durabilityCompletionTask =
std::make_shared<DurabilityCompletionTask>(engine);
ExecutorPool::get()->schedule(durabilityCompletionTask);

ExTask workloadMonitorTask =
std::make_shared<WorkLoadMonitor>(&engine, false);
ExecutorPool::get()->schedule(workloadMonitorTask);
Expand Down Expand Up @@ -2621,6 +2626,14 @@ uint16_t KVBucket::getNumOfVBucketsInState(vbucket_state_t state) const {
return vbMap.getVBStateCount(state);
}

SyncWriteResolvedCallback KVBucket::makeSyncWriteResolvedCB() {
return [this](Vbid vbid) {
if (this->durabilityCompletionTask) {
this->durabilityCompletionTask->notifySyncWritesToComplete(vbid);
}
};
}

SyncWriteCompleteCallback KVBucket::makeSyncWriteCompleteCB() {
auto& engine = this->engine;
return [&engine](const void* cookie, ENGINE_ERROR_CODE status) {
Expand Down
Loading

0 comments on commit 7fdb98a

Please sign in to comment.