Skip to content

Commit

Permalink
MB-48816: Avoid unsafe use of cookie from background tasks
Browse files Browse the repository at this point in the history
Previously, StatCheckpointTask and StatDCPTask immediately wrote
responses when collecting stats while on a background thread.

TSAN reported this as unsafe; no locks prevent potential racing with a
frontend thread manipulating the cookie.

Change both tasks to accumulate task values, but leave the frontend
thread to actually write the responses when it resumes the
ewouldblock'ed operation.

TSAN Report:
WARNING: ThreadSanitizer: data race (pid=24371)
  Read of size 8 at 0x7b54000a2df0 by thread T62:
    #0 Cookie::getHeader() const kv_engine/daemon/cookie.cc:201 (memcached+0x6508ac)
    #1 append_stats kv_engine/daemon/protocol/mcbp/stats_context.cc:95 (memcached+0x71fd6c)
    ....
    #19 void StatCollector::addStat<cb::stats::Key, unsigned long const&>(cb::stats::Key&&, unsigned long const&) const ../kv_engine/include/statistics/collector.h:336 (memcached+0x7e50e5)
    #20 EventuallyPersistentEngine::addAggregatedProducerStats(BucketStatCollector const&, ConnCounter const&) kv_engine/engines/ep/src/ep_engine.cc:4038 (memcached+0x7e50e5)
    #21 EventuallyPersistentEngine::doDcpStatsInner(CookieIface const*, std::function<void (std::basic_string_view<char, std::char_traits<char> >, std::basic_string_view<char, std::char_traits<char> >, void const*)> const&, std::basic_string_view<char, std::char_traits<char> >) kv_engine/engines/ep/src/ep_engine.cc:4030 (memcached+0x81bd05)

  Previous write of size 8 at 0x7b54000a2df0 by thread T21 (mutexes: write M3843):
    #0 Cookie::setPacket(cb::mcbp::Header const&, bool) kv_engine/daemon/cookie.cc:186 (memcached+0x65080e)
    #1 Cookie::preserveRequest() kv_engine/daemon/cookie.h:225 (memcached+0x696aa7)
    #2 Connection::executeCommandPipeline() kv_engine/daemon/connection.cc:581 (memcached+0x696aa7)
    #3 Connection::executeCommandsCallback() kv_engine/daemon/connection.cc:793 (memcached+0x696be8)
    #4 Connection::rw_callback(bufferevent*, void*) kv_engine/daemon/connection.cc:942 (memcached+0x697851)
    #5 bufferevent_run_deferred_callbacks_unlocked /home/couchbase/jenkins/workspace/cbdeps-platform-build-old/deps/packages/build/libevent/libevent-prefix/src/libevent/bufferevent.c:208 (libevent_core-2.1.so.7+0xf71d)
    #6 folly::EventBase::loopBody(int, bool) folly/io/async/EventBase.cpp:397 (memcached+0xfc9b52)
    #7 folly::EventBase::loop() folly/io/async/EventBase.cpp:315 (memcached+0xfcb06b)
    #8 folly::EventBase::loopForever() folly/io/async/EventBase.cpp:538 (memcached+0xfcb06b)
    #9 worker_libevent kv_engine/daemon/thread.cc:115 (memcached+0x6c16af)
    #10 CouchbaseThread::run() platform/src/cb_pthreads.cc:51 (memcached+0xf217d5)
    #11 platform_thread_wrap platform/src/cb_pthreads.cc:64 (memcached+0xf217d5)

Change-Id: I3fbd8d51e174a7d19c5cb608a969795e445b8e86
Reviewed-on: http://review.couchbase.org/c/kv_engine/+/163709
Tested-by: Build Bot <[email protected]>
Reviewed-by: Dave Rigby <[email protected]>
  • Loading branch information
jameseh96 authored and daverigby committed Nov 2, 2021
1 parent 8ae3b4f commit a871442
Show file tree
Hide file tree
Showing 5 changed files with 220 additions and 45 deletions.
1 change: 1 addition & 0 deletions engines/ep/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ set_source_files_properties(src/crc32.c

ADD_LIBRARY(ep_objs OBJECT
src/access_scanner.cc
src/background_stat_task.cc
src/bgfetcher.cc
src/blob.cc
src/bloomfilter.cc
Expand Down
58 changes: 58 additions & 0 deletions engines/ep/src/background_stat_task.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2021-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#include "background_stat_task.h"

#include "bucket_logger.h"
#include "ep_engine.h"
#include <phosphor/phosphor.h>
#include <string_view>

BackgroundStatTask::BackgroundStatTask(EventuallyPersistentEngine* e,
const CookieIface* cookie,
TaskId taskId)
: GlobalTask(e, taskId, 0, false), e(e), cookie(cookie) {
}

bool BackgroundStatTask::run() {
TRACE_EVENT0("ep-engine/task", "BackgroundStatTask");
try {
status = collectStats();
} catch (const std::exception& e) {
EP_LOG_WARN(
"BackgroundStatTask: callback threw exception: \"{}\" task "
"desc:\"{}\"",
e.what(),
getDescription());
}
// _must_ notify success to ensure the frontend calls back into
// getStats - the second call is required to avoid leaking data
// allocated to store in the engine specific.
// The real status is stored and shall be retrieved later.
e->notifyIOComplete(cookie, cb::engine_errc::success);
return false;
}

cb::engine_errc BackgroundStatTask::maybeWriteResponse(
const AddStatFn& addStat) const {
if (status == cb::engine_errc::success) {
for (const auto& [key, value] : stats) {
addStat(key, value, cookie);
}
}
return status;
}

AddStatFn BackgroundStatTask::getDeferredAddStat() {
return [this](std::string_view key, std::string_view value, const void*) {
this->stats.emplace_back(key, value);
};
}
79 changes: 79 additions & 0 deletions engines/ep/src/background_stat_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2021-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/
#pragma once

#include <executor/globaltask.h>
#include <memcached/cookie_iface.h>
#include <memcached/engine_common.h>
#include <memcached/engine_error.h>

#include <functional>
#include <string>
#include <utility>
#include <vector>

// forward decl
enum class TaskId : int;

/**
* Base type for tasks which gather stats on a background task.
*
* For use when generating stats is likely to be expensive, to avoid
* taking up frontend thread time.
* Users should construct and schedule the task, then store it in the
* cookie engine specific with
* EventuallyPersistentEngine::storeStatTask(...)
* Once the task has notified the frontend, the task should be retrieved with
* EventuallyPersistentEngine::retrieveStatTask(...)
*/
class BackgroundStatTask : public GlobalTask {
public:
using Callback = std::function<cb::engine_errc(EventuallyPersistentEngine*,
const AddStatFn&)>;
BackgroundStatTask(EventuallyPersistentEngine* e,
const CookieIface* cookie,
TaskId taskId);

bool run() override;

/**
* If the task was successful, write all gathered stats as responses.
*
* Should only be called from a frontend thread.
* @param addStat frontend provided callback
* @return errc reflecting status of the operation
*/
cb::engine_errc maybeWriteResponse(const AddStatFn& addStat) const;

protected:
/**
* Do potentially expensive work to collect stats in a background task.
* @return status of operation
*/
virtual cb::engine_errc collectStats() = 0;

/**
* Get a callback used to store stats which have been computed by the
* background task.
*
* Stats cannot be immediately written as responses from the background
* task as doing so could be racy. Instead, store them for when the
* frontend thread revisits this operation.
*/
AddStatFn getDeferredAddStat();

EventuallyPersistentEngine* e;
const CookieIface* cookie;

// stats which have been collected while running in a background task
std::vector<std::pair<std::string, std::string>> stats;
cb::engine_errc status = cb::engine_errc::success;
};
97 changes: 52 additions & 45 deletions engines/ep/src/ep_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,31 @@ void* EventuallyPersistentEngine::getEngineSpecific(const CookieIface* cookie) {
return cookie->getEngineStorage();
}

void EventuallyPersistentEngine::storeStatTask(
const CookieIface* cookie, std::shared_ptr<BackgroundStatTask> task) {
// store a ptr to a shared_ptr to the task (must ensure the task is not
// destroyed before the connection retrieves the result).
// Once the frontend thread is notified, it must call retrieveStatTask,
// which will free the heap-allocated shared_ptr.
auto wrapper = std::make_unique<std::shared_ptr<BackgroundStatTask>>(task);
storeEngineSpecific(cookie, wrapper.release());
}

std::shared_ptr<BackgroundStatTask>
EventuallyPersistentEngine::retrieveStatTask(const CookieIface* cookie) {
void* data = getEngineSpecific(cookie);
if (data) {
// take back ownership of the task ptr
auto ptr = std::unique_ptr<std::shared_ptr<BackgroundStatTask>>(
reinterpret_cast<std::shared_ptr<BackgroundStatTask>*>(data));
// clear the engine specific - it's not valid to retrieve the
// task twice (would lead to a double free)
storeEngineSpecific(cookie, nullptr);
return *ptr;
}
return nullptr;
}

bool EventuallyPersistentEngine::isDatatypeSupported(
const CookieIface* cookie, protocol_binary_datatype_t datatype) {
return cookie->isDatatypeSupported(datatype);
Expand Down Expand Up @@ -3756,23 +3781,20 @@ class StatCheckpointVisitor : public VBucketVisitor {
AddStatFn add_stat;
};


class StatCheckpointTask : public GlobalTask {
class StatCheckpointTask : public BackgroundStatTask {
public:
StatCheckpointTask(EventuallyPersistentEngine* e,
const CookieIface* c,
AddStatFn a)
: GlobalTask(e, TaskId::StatCheckpointTask, 0, false),
ep(e),
cookie(c),
add_stat(std::move(a)) {
: BackgroundStatTask(e, c, TaskId::StatCheckpointTask) {
}
bool run() override {

cb::engine_errc collectStats() override {
TRACE_EVENT0("ep-engine/task", "StatsCheckpointTask");
StatCheckpointVisitor scv(ep->getKVBucket(), cookie, add_stat);
ep->getKVBucket()->visit(scv);
ep->notifyIOComplete(cookie, cb::engine_errc::success);
return false;
StatCheckpointVisitor scv(
e->getKVBucket(), cookie, getDeferredAddStat());
e->getKVBucket()->visit(scv);
return cb::engine_errc::success;
}

std::string getDescription() const override {
Expand All @@ -3785,11 +3807,6 @@ class StatCheckpointTask : public GlobalTask {
// take /too/ long, so set limit of 100ms.
return std::chrono::milliseconds(100);
}

private:
EventuallyPersistentEngine *ep;
const CookieIface* cookie;
AddStatFn add_stat;
};
/// @endcond

Expand All @@ -3799,15 +3816,14 @@ cb::engine_errc EventuallyPersistentEngine::doCheckpointStats(
const char* stat_key,
int nkey) {
if (nkey == 10) {
void* es = getEngineSpecific(cookie);
if (es == nullptr) {
ExTask task = std::make_shared<StatCheckpointTask>(
this, cookie, add_stat);
auto task = retrieveStatTask(cookie);
if (!task) {
task = std::make_shared<StatCheckpointTask>(this, cookie, add_stat);
ExecutorPool::get()->schedule(task);
storeEngineSpecific(cookie, this);
storeStatTask(cookie, task);
return cb::engine_errc::would_block;
} else {
storeEngineSpecific(cookie, nullptr);
return task->maybeWriteResponse(add_stat);
}
} else if (nkey > 11) {
std::string vbid(&stat_key[11], nkey - 11);
Expand Down Expand Up @@ -4034,21 +4050,19 @@ static void showConnAggStat(const std::string& connType,
}
}

class StatDCPTask : public GlobalTask {
class StatDCPTask : public BackgroundStatTask {
public:
using Callback = std::function<cb::engine_errc(
EventuallyPersistentEngine* e, const CookieIface* cookie)>;
StatDCPTask(EventuallyPersistentEngine* e,
const CookieIface* cookie,
std::string_view description,
Callback callback)
: GlobalTask(e, TaskId::StatDCPTask, 0, false),
e(e),
cookie(cookie),
: BackgroundStatTask(e, cookie, TaskId::StatDCPTask),
description(description),
callback(std::move(callback)) {
}
bool run() override {
cb::engine_errc collectStats() override {
TRACE_EVENT0("ep-engine/task", "StatDCPTask");
cb::engine_errc result = cb::engine_errc::failed;
try {
Expand All @@ -4060,8 +4074,7 @@ class StatDCPTask : public GlobalTask {
e.what(),
getDescription());
}
e->notifyIOComplete(cookie, result);
return false;
return result;
}

std::string getDescription() const override {
Expand All @@ -4075,8 +4088,6 @@ class StatDCPTask : public GlobalTask {
}

private:
EventuallyPersistentEngine* e;
const CookieIface* cookie;
const std::string description;
Callback callback;
};
Expand All @@ -4085,9 +4096,9 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
const CookieIface* cookie,
const AddStatFn& add_stat,
std::string_view sep) {
void* engineSpecific = getEngineSpecific(cookie);
if (engineSpecific == nullptr) {
ExTask task = std::make_shared<StatDCPTask>(
auto task = retrieveStatTask(cookie);
if (!task) {
task = std::make_shared<StatDCPTask>(
this,
cookie,
"Aggregated DCP stats",
Expand All @@ -4100,13 +4111,11 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
return cb::engine_errc::success;
});
ExecutorPool::get()->schedule(task);
storeEngineSpecific(cookie, this);
storeStatTask(cookie, task);
return cb::engine_errc::would_block;
} else {
storeEngineSpecific(cookie, nullptr);
return task->maybeWriteResponse(add_stat);
}

return cb::engine_errc::success;
}

cb::engine_errc EventuallyPersistentEngine::doConnAggStatsInner(
Expand Down Expand Up @@ -4137,9 +4146,9 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats(
const CookieIface* cookie,
const AddStatFn& add_stat,
std::string_view value) {
void* engineSpecific = getEngineSpecific(cookie);
if (engineSpecific == nullptr) {
ExTask task = std::make_shared<StatDCPTask>(
auto task = retrieveStatTask(cookie);
if (!task) {
task = std::make_shared<StatDCPTask>(
this,
cookie,
"Summarised bucket-wide DCP stats",
Expand All @@ -4150,13 +4159,11 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats(
return cb::engine_errc::success;
});
ExecutorPool::get()->schedule(task);
storeEngineSpecific(cookie, this);
storeStatTask(cookie, task);
return cb::engine_errc::would_block;
} else {
storeEngineSpecific(cookie, nullptr);
return task->maybeWriteResponse(add_stat);
}

return cb::engine_errc::success;
}

void EventuallyPersistentEngine::doDcpStatsInner(const CookieIface* cookie,
Expand Down
30 changes: 30 additions & 0 deletions engines/ep/src/ep_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#pragma once

#include "background_stat_task.h"
#include "configuration.h"
#include "ep_engine_public.h"
#include "error_handler.h"
Expand Down Expand Up @@ -538,6 +539,35 @@ class EventuallyPersistentEngine : public EngineIface, public DcpIface {

void* getEngineSpecific(const CookieIface* cookie);

/**
* Store a task pointer as the engine specific data.
*
* Allocates a wrapper holding a shared_ptr; storing the raw task ptr
* would be insufficient - the task needs to be owned to ensure it has not
* been destroyed when the frontend thread needs to retrieve it's result.
*
* Caller is responsible for scheduling the task, and later calling
* retrieveStatTask to destroy the wrapper and retrieve the task.
*
* @param cookie cookie from frontend
* @param task shared_ptr to background task
*/
void storeStatTask(const CookieIface* cookie,
std::shared_ptr<BackgroundStatTask> task);

/**
* Get and clear the engine specific data, then attempt to
* extract a BackgroundStatTask.
*
* Frees the wrapper allocated by storeStatTask, if one was found in the
* engine specific data.
*
* @param cookie cookie from frontend
* @return (possibly null) shared_ptr to background task
*/
std::shared_ptr<BackgroundStatTask> retrieveStatTask(
const CookieIface* cookie);

bool isDatatypeSupported(const CookieIface* cookie,
protocol_binary_datatype_t datatype);

Expand Down

0 comments on commit a871442

Please sign in to comment.