Skip to content

Commit

Permalink
MB-42029: FollyExecPool: Wait for tasks cancelled in unregisterTaskable
Browse files Browse the repository at this point in the history
+Issue+

When enabling FollyExecutorPool by default, TSan reports the following
race when running ./ep-engine_ep_unit_tests
"--gtest_filter=DurabilityRespondAmbiguousTest.*":

AuxIO thread:
    Previous atomic write of size 8 at 0x7b74000020a0 by thread T8:
    #0 __tsan_atomic64_fetch_sub <null> (libtsan.so.0+0x000000060890)
    ...
    #4 ~HashTable kv_engine/engines/ep/src/hash_table.cc:161 (ep-engine_ep_unit_tests+0x00000122cae1)
    #5 ~VBucket kv_engine/engines/ep/src/vbucket.cc:286 (ep-engine_ep_unit_tests+0x0000012b3af4)
    #6 ~EPVBucket kv_engine/engines/ep/src/ep_vb.cc:101 (ep-engine_ep_unit_tests+0x0000011af5e1)
    ...
    #10 ~VBucketMemoryDeletionTask kv_engine/engines/ep/src/vbucketdeletiontask.cc:45 (ep-engine_ep_unit_tests+0x0000012e4530)
    ...
    #17 std::__shared_ptr<GlobalTask>::reset() /usr/local/include/c++/7.3.0/bits/shared_ptr_base.h:1235 (ep-engine_ep_unit_tests+0x000001221e75)
    #18 FollyExecutorPool::TaskProxy::~TaskProxy()::{lambda()#1}::operator()() kv_engine/engines/ep/src/folly_executorpool.cc:80 (ep-engine_ep_unit_tests+0x000001221e75)

Main thread:
    Write of size 8 at 0x7b74000020a0 by main thread:
    #0 free <null> (libtsan.so.0+0x000000027806)
    ...
    #6 CoreStore<...>::~CoreStore() platform/include/platform/corestore.h:50 (ep-engine_ep_unit_tests+0x0000012988b1)
    #7 ~EPStats kv_engine/engines/ep/src/stats.cc:132 (ep-engine_ep_unit_tests+0x0000012988b1)
    #8 ~EventuallyPersistentEngine kv_engine/engines/ep/src/ep_engine.cc:6593 (ep-engine_ep_unit_tests+0x0000011e3bb5)
    ...
    #12 DurabilityRespondAmbiguousTest_RespondAmbiguousNotificationDeadLock_Test::TestBody() kv_engine/engines/ep/tests/module_tests/evp_store_durability_test.cc:2350 (ep-engine_ep_unit_tests+0x000000bd3642)

The crux of this issue seems to be that a background AuxIO task run
via the FollyExecutorPool is deleting a VBucket object concurrently
with the main thread deleting an EPStats object.

+Background+

Details of how CB3ExecutorPool and FollyExecutorPool implement
{{unregisterTaskable()}}, which I believe is what leads to this
problem.

CB3ExecutorPool:

During CB3ExecutorPool::unregisterTaskable():

1. CB3ExecutorPool::_stopTaskGroup() is called and will wait for
   VBucketMemoryDeletionTask to run.

2. When VBucketMemoryDeletionTask::run() is call it returns false.

3. CB3ExecutorThread will then synchronously call
   CB3ExecutorThread::cancelCurrentTask() ->
   CB3ExecutorPool::cancel(). That removes all Cb3ExecutorPool-owned
   references to task, and hence will run VBucketMemoryDeletionTask
   dtor.

4. VBucketMemoryDeletionTask dtor frees the VBucket object.

As such, by the time CB3ExecutorPool::unregisterTaskable() returns the
VBucket is *guaranteed* to have been freed.

FollyExecutorPool:

During FollyExecutorPool::unregisterTaskable():

1. All tasks scheduled to run in future (owned by IO thread EventBase)
   are either cancelled (if allowed), or woken to run asap on CPU pool.

2. All tasks waiting to, or currently running on CPU pool are waited
   for by polling for taskOwners to no longer contain any tasks for
   the given taskable.

3. (On the CPU threads) Each queued task is run, on completion
   rescheduleTaskAfterRun is called to add work to the IO thread
   EventBase to decide when to reschedule, or (in this case) to
   actualy cancel the task.

4. (On the IO thread) FollyExecutorPool::rescheduleTaskAfterRun is
   called, for cancelled tasks this calls State::cancelTask() which
   removes the task from taskOwners - at which point TaskProxy dtor
   runs, which schedules _another_ task on CPU pool to actually delete
   the GlobalTask.

The problem here is that the TaskProxy is removed from taskOwners and
deletes at (4) on the IO thread; however the GlobalTask destruction is
deferred to later execution on a CPU thread. As such,
FollyExecutorPool::unregisterTaskable() can see taskOwners having no
tasks left for the taskable (and hence return) _before_ the VBucket
object is deleted.

Note the deferred deletion at (5) was added to avoid potentially large
amounts of work being done on the IO thread - we aim to minimise work
done here as it can impact the scheduling of other tasks.

+Solution+

If the TaskProxy removal from taskOwners is deferred until _after_ the
GlobalTask shared ownership is released, then unregisterTaskable()
will no longer return until all GlobalTask references inside
FollyExecutorPool have been released.

To achieve this changes the ownership model in FollyExecutorPool are
needed:

1. Don't immediately remove TaskProxy from taskOwners in cancelTask().
   Instead:
   a) Mark it as cancelled, by setting the GlobalTask ptr to null,
   b) Schedule an asynchronous task to release its GlobalTask
      shared_ptr.

2. This new async task (setup in resetTaskPtrViaCpuPool) releases the
   TaskProxy's shared ownership on GlobalTask, then schedules an (IO
   thread) completion task to finally remove the TaskProxy from
   taskOwners.

unregisterTaskable() is unchanged - it still waits for the taskOwner
map for the given Taskable become empty; however given the above
changes that only happens once the GlobalTask reference has been
released.

Change-Id: Iecbff9f3b45fc9e3d385c67f6a6dd32242dc76fe
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/138373
Tested-by: Build Bot <[email protected]>
Reviewed-by: Jim Walker <[email protected]>
  • Loading branch information
daverigby committed Oct 23, 2020
1 parent d541aaa commit c370cd5
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 46 deletions.
245 changes: 200 additions & 45 deletions engines/ep/src/folly_executorpool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#include <platform/string_hex.h>
#include <statistics/collector.h>

using namespace std::string_literals;

/**
* Thread factory for CPU pool threads.
* - Gives each thread name based on the given prefix.
Expand Down Expand Up @@ -62,23 +64,21 @@ struct FollyExecutorPool::TaskProxy
public std::enable_shared_from_this<TaskProxy> {
TaskProxy(FollyExecutorPool& executor,
folly::CPUThreadPoolExecutor& pool,
ExTask task)
: task(std::move(task)), executor(executor), cpuPool(pool) {
ExTask task_)
: task(std::move(task_)),
taskId(task->getId()),
executor(executor),
cpuPool(pool) {
}

~TaskProxy() override {
// We are potentially the last (shared) owner of the GlobalTask,
// whose destruction may perform an arbitrary amount of work which we
// don't want to run on a non-CPU thread. As such, perform the actual
// destructor on the appropriate CPU pool.
ExTask taskToDelete;
task.swap(taskToDelete);
cpuPool.add([taskToDelete = std::move(taskToDelete)]() mutable {
// We must account the destruction of the GlobalTask to the bucket
// which owns it.
BucketAllocationGuard guard(taskToDelete->getEngine());
taskToDelete.reset();
});
// To ensure that we do not destruct GlobalTask objects on
// arbitrary threads (if the TaskProxy is the last owner), we
// should have already called task.reset() before desturction
// of the TaskProxy.
Expects(!task &&
"task shared_ptr should already be empty before destructing "
"TaskProxy");
}

void timeoutExpired() noexcept override {
Expand Down Expand Up @@ -136,11 +136,7 @@ struct FollyExecutorPool::TaskProxy
// Perform work on the appropriate CPU pool.
// Note this retains a reference to itself (TaskProxy).
cpuPool.add([proxy = shared_from_this()] {
if (!proxy->task) {
// ExTask has been set to null - Taskable likely unregistered
// - nothing to do.
return;
}
Expects(proxy->task.get());

bool runAgain = false;
// Check if Task is still alive. If not don't run.
Expand Down Expand Up @@ -199,6 +195,41 @@ struct FollyExecutorPool::TaskProxy
});
}

void resetTaskPtrViaCpuPool() {
using namespace std::chrono;

EP_LOG_TRACE(
"TaskProxy::resetTaskPtrViaCpuPool() id:{} name:{} descr:'{}' "
"enqueuing func to reset 'task' shared_ptr",
task->getId(),
GlobalTask::getTaskName(task->getTaskId()),
task->getDescription());

// Move `task` from this object (leaving it as null)
cpuPool.add([ptrToReset = std::move(task), proxy = this]() mutable {
EP_LOG_TRACE(
"FollyExecutorPool::resetTaskPtrViaCpuPool lambda() id:{} "
"name:{}",
ptrToReset->getId(),
GlobalTask::getTaskName(ptrToReset->getTaskId()));

// Reset the shared_ptr, decrementing it's refcount and potentially
// deleting the owned object if no other objects (Engine etc) have
// retained a refcount.
// Must account this to the relevent bucket.
{
BucketAllocationGuard guard(ptrToReset->getEngine());
ptrToReset.reset();
}
// Finally, remove the taskProxy from taskOwners.
proxy->executor.futurePool->getEventBase()->runInEventBaseThread(
[proxy] {
auto& executor = proxy->executor;
executor.removeTaskAfterRun(*proxy);
});
});
}

/**
* Updates the timeout to the value of the GlobalTasks' wakeTime
*/
Expand Down Expand Up @@ -234,6 +265,15 @@ struct FollyExecutorPool::TaskProxy
Expects(executor.futurePool->getEventBase()
->inRunningEventBaseThread());

if (!task) {
// Task has been cancelled ('task' shared ptr reset to
// null via resetTaskPtrViaCpuPool), but TaskProxy not yet
// been cleaned up) - i.e. a wake and cancel have raced.
// Cannot wake (until GlobalTask is re-scheduled) -
// return.
return;
}

// Cancel any previously set future execution of the
// task.
cancelTimeout();
Expand All @@ -254,10 +294,20 @@ struct FollyExecutorPool::TaskProxy
/// shared_ptr to the GlobalTask.
ExTask task;

// identifier of the task. This is a copy of data within
// `task`, because we reset `task` before removing the TaskProxy
// entry from taskOwners, and need the taskId for that.
const size_t taskId;

// Flag used to block re-scheduling of a task while it's in the process
// of running on CPU pool. See comments in timeoutExpired().
bool scheduledOnCpuPool{false};

// Did we re-use the same TaskProxy after a re-schedule? Used for
// sanity checking if removeTaskAfterRun() finds a non-null 'task'
// when executed.
bool proxyReused{false};

private:
// Associated FollyExecutorPool (needed for re-scheduling / cancelling
// dead tasks).
Expand Down Expand Up @@ -297,7 +347,7 @@ using TaskOwnerMap = std::unordered_map<const Taskable*, TaskLocator>;
*/
struct FollyExecutorPool::State {
void addTaskable(Taskable& taskable) {
taskOwners.insert({&taskable, {}});
taskOwners.try_emplace(&taskable);
}

void removeTaskable(Taskable& taskable) {
Expand All @@ -310,20 +360,35 @@ struct FollyExecutorPool::State {
* @param executor FollyExecutorPool owning the task.
* @param task The Task to schedule.
*/
void scheduleTask(FollyExecutorPool& executor,
bool scheduleTask(FollyExecutorPool& executor,
folly::CPUThreadPoolExecutor& pool,
ExTask task) {
auto& tasksForOwner = taskOwners.at(&task->getTaskable());
auto result = tasksForOwner.try_emplace(
task->getId(),
std::make_shared<TaskProxy>(executor, pool, task));
auto& it = result.first;
auto [it, inserted] = tasksForOwner.try_emplace(
task->getId(), std::shared_ptr<TaskProxy>{});
if (!inserted) {
// taskId already present - i.e. this taskId has already been
// scheduled.
// It it only valid to re-schedule a task if it was previously
// cancelled, but we hadn't cleaned up the cancellation - the
// 'task' shared_ptr is null.
if (it->second->task) {
return false;
}
// re-assign task to the one passed in.
it->second->task = task;
it->second->proxyReused = true;
} else {
// Inserted a new entry into map - create a TaskProxy object for it.
it->second = std::make_shared<TaskProxy>(executor, pool, task);
}

// If we are rescheduling a previously cancelled task, we should
// reset the task state to the initial value of running.
task->setState(TASK_RUNNING, TASK_DEAD);
it->second->task->setState(TASK_RUNNING, TASK_DEAD);

it->second->updateTimeoutFromWakeTime();
return true;
}

/**
Expand Down Expand Up @@ -389,8 +454,13 @@ struct FollyExecutorPool::State {
std::vector<ExTask> cancelTasksOwnedBy(const Taskable& taskable,
bool force) {
std::vector<ExTask> removedTasks;
for (auto it : taskOwners.at(&taskable)) {
for (auto& it : taskOwners.at(&taskable)) {
auto& tProxy = it.second;
if (!tProxy->task) {
// Task already cancelled (shared pointer reset to null) by
// canelTask() - skip.
continue;
}
EP_LOG_DEBUG(
"FollyExecutorPool::unregisterTaskable(): Stopping "
"Task id:{} taskable:{} description:'{}'",
Expand All @@ -415,13 +485,12 @@ struct FollyExecutorPool::State {
}

/**
* Cancel the specified task, optionally removing it from taskOwners.
* Cancel the specified task.
*
* @param taskId Task to cancel
* @param eraseTask If true then erase the task from taskOwners.
* @return True if task found, else false.
*/
bool cancelTask(size_t taskId, bool eraseTask) {
bool cancelTask(size_t taskId) {
// Search for the given taskId across all buckets.
// PERF: CB3ExecutorPool uses a secondary map (taskLocator)
// to allow O(1) lookup by taskId, however cancel() isn't a
Expand All @@ -438,14 +507,70 @@ struct FollyExecutorPool::State {
taskId,
owner->getName());

if (!it->second->task) {
// Task already cancelled (shared pointer reset to null) by
// some previous call to cancelTask().
return false;
}

it->second->task->cancel();
if (eraseTask) {
tasks.erase(it);

// Now `task` has been cancelled, we need to remove our
// reference (shared ownership) to the owned GlobalTask and from
// taskOwners. Decrementing our refcount could delete the
// GlobalTask (if we are the last owner). This must occur on a
// CPU thread given GlobalTask destruction can be an arbitrary
// amount of work.
if (it->second->scheduledOnCpuPool) {
// Currently scheduled on CPU pool - TaskProxy is "owned" by
// CPU thread. Given we just called cancel() on the
// GlobalTask, we can rely on rescheduleTaskAfterRun to
// reset the TaskProxy (when it calls cancelTask()).
return true;
}
// Note: We could potentially always erase here, given
// that this is running in the eventBase and hence
// there's no issue of racing with ourselves like
// there is for CB3ExecutorPool::_cancel.

// Not currently scheduled on CPU pool - this thread (IO pool)
// owns it. To perform refcount drop on CPu thread, we move the
// shared_ptr from taskOwners (taskOwners entry remains but is
// null), then pass the moved shared_ptr to CPU pool to perform
// refcount decrement (and potential GlobalTask
// (IO pool) owns it.
// First cancel any pending timeout - shouldn't run again.
it->second->cancelTimeout();

// Next, to perform refcount drop on CPU thread, we move the
// shared_ptr from taskOwners (taskOwners entry remains but is
// null), then pass the moved shared_ptr to CPU pool to perform
// refcount decrement (and potential GlobalTask deletion).
// Finally CPU pool will schedule a final IO thread function to
// actually erase element from taskOwners.
it->second->resetTaskPtrViaCpuPool();
return true;
}
}
return false;
}

/**
* Remove the cancelled task from taskOwners.
*
* @param taskId Task to remove
* @return True if task found, else false.
*/
bool removeTask(size_t taskId) {
for (auto& [owner, tasks] : taskOwners) {
auto it = tasks.find(taskId);
if (it != tasks.end()) {
EP_LOG_TRACE(
"FollyExecutorPool::State::removeTask() erasing task "
"id:{} for "
"owner:'{}'",
taskId,
owner->getName());
Expects(!it->second->task &&
"removeTask: 'proxy->task' should be null before "
"removing element from taskOwners");
tasks.erase(it);
return true;
}
}
Expand Down Expand Up @@ -731,8 +856,8 @@ bool FollyExecutorPool::cancel(size_t taskId, bool eraseTask) {
auto* eventBase = futurePool->getEventBase();
bool found = false;
eventBase->runInEventBaseThreadAndWait(
[&found, state = state.get(), taskId, eraseTask] {
state->cancelTask(taskId, eraseTask);
[&found, state = state.get(), taskId] {
state->cancelTask(taskId);
});
return found;
}
Expand Down Expand Up @@ -941,14 +1066,11 @@ void FollyExecutorPool::rescheduleTaskAfterRun(
// Deschedule the task, in case it was already scheduled
proxy->cancelTimeout();

// Decrement the ref-count on the task, and remove from taskLocator
// (same pattern as CB3ExecutorPool -
// ExecutorThread::cancelCurrentTask())
state->cancelTask(proxy->task->getId(), true);
// Begin process of cancelling the task - mark as cancelled in
// taskOwners and schedule another CPU thread pool function to decrement
// the refcount and potentially delete the GlobalTask.
state->cancelTask(proxy->task->getId());

// At this point the ref-count on the TaskProxy is still at least one,
// via the `proxy` object. On return that will go out of scope so
// object may be deleted (if no other object has a ref-count).
return;
}

Expand All @@ -963,3 +1085,36 @@ void FollyExecutorPool::rescheduleTaskAfterRun(
proxy->scheduleViaCPUPool();
}
}

void FollyExecutorPool::removeTaskAfterRun(TaskProxy& proxy) {
EP_LOG_TRACE("TaskProxy::removeTaskAfterRun() id:{} name:{}",
proxy.taskId,
proxy.task ? ("RESURRECTED:"s +
GlobalTask::getTaskName(proxy.task->getTaskId()))
: "<null>"s);

if (proxy.task) {
Expects(proxy.proxyReused);
return;
}

// Deschedule the task, in case it was already scheduled
proxy.cancelTimeout();

// Erase the task from taskOwners. If the TaskProxy is the last
// shared owner of the GlobalTask, that will be deleted here.

// PERF: CB3ExecutorPool uses a secondary map (taskLocator) to
// allow O(1) lookup of ownwe by taskId, however cancelling
// isn't a particularly hot function so I'm not sure if the
// extra complexity is warranted. If this shows up as hot then
// consider adding a similar structure to FollyExecutorPool.
bool taskFound = state->removeTask(proxy.taskId);
if (!taskFound) {
auto msg = fmt::format(
"FollyExecutorPool::rescheduleTaskAfterRun(): Failed to locate "
"an owner for task id:{}",
proxy.taskId);
throw std::logic_error(msg);
}
}
4 changes: 4 additions & 0 deletions engines/ep/src/folly_executorpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,10 @@ class FollyExecutorPool : public ExecutorPool {
/// the task is dead (or should run again).
void rescheduleTaskAfterRun(std::shared_ptr<TaskProxy> proxy);

/// Remove the given taskProxy from the tracked tasks.
/// Should only be called at the end of scheduleViaCPUPool.
void removeTaskAfterRun(TaskProxy& proxy);

struct State;
/**
* FollyExecutorPool internal state. unique_ptr for pimpl.
Expand Down
2 changes: 1 addition & 1 deletion engines/ep/src/globaltask.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class GlobalTask {
*
* @return The id of this task.
*/
TaskId getTaskId() {
TaskId getTaskId() const {
return taskId;
}

Expand Down
Loading

0 comments on commit c370cd5

Please sign in to comment.