Skip to content

Commit

Permalink
[BP] MB-59601: Fix data race in CheckpointManager::takeAndResetCursors
Browse files Browse the repository at this point in the history
The method did not take a queueLock and could mutate the
CheckpointManager while it is being accessed, e.g. in
CheckpointManager::getListOfCursorsToDrop.

CheckpointMemRecoveryTask calls getListOfCursorsToDrop which iterates
CM::cursors. A concurrent RollbackTask can result in resetting the
vbucket and calling CM::takeAndResetCursors, which among others
mutates CM::cursors.

WARNING: ThreadSanitizer: data race (pid=60355)
  Write of size 8 at 0x00010d1a5e68 by main thread (mutexes: write M0, write M1, write M2):
    #0 CheckpointManager::takeAndResetCursors(CheckpointManager&) checkpoint_manager.cc:1856 (ep-engine_ep_unit_tests:arm64+0x1003795b4)
    #1 KVBucket::resetVBucket_UNLOCKED(LockedVBucketPtr&, std::__1::unique_lock<std::__1::mutex>&) kv_bucket.cc:1271 (ep-engine_ep_unit_tests:arm64+0x1001da918)
    #2 KVBucket::rollback(Vbid, unsigned long long) kv_bucket.cc:2671 (ep-engine_ep_unit_tests:arm64+0x1001e8404)
    #3 CheckpointRemoverTest_MB59601_Test::TestBody() checkpoint_remover_test.cc:513 (ep-engine_ep_unit_tests:arm64+0x10054117c)
    #4 virtual thunk to CheckpointRemoverTest_MB59601_Test::TestBody() checkpoint_remover_test.cc (ep-engine_ep_unit_tests:arm64+0x100541448)
    #5 void testing::internal::HandleExceptionsInMethodIfSupported<testing::Test, void>(testing::Test*, void (testing::Test::*)(), char const*) gtest.cc:2643 (ep-engine_ep_unit_tests:arm64+0x10195a8e0)
    #6 <null> <null> (0x000186e390e0)

  Previous read of size 8 at 0x00010d1a5e68 by thread T1 (mutexes: write M3):
    #0 CheckpointManager::getListOfCursorsToDrop() checkpoint_manager.cc:802 (ep-engine_ep_unit_tests:arm64+0x100372bdc)
    #1 CheckpointMemRecoveryTask::attemptCursorDropping() checkpoint_remover.cc:174 (ep-engine_ep_unit_tests:arm64+0x10037c710)
    #2 CheckpointMemRecoveryTask::runInner() checkpoint_remover.cc:291 (ep-engine_ep_unit_tests:arm64+0x10037d068)
    #3 NotifiableTask::run() notifiable_task.cc:18 (ep-engine_ep_unit_tests:arm64+0x101934ed8)
    #4 void* std::__1::__thread_proxy[abi:v160006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct>>, CheckpointRemoverTest_MB59601_Test::TestBody()::$_3::operator()() const::'lambda0'()>>(void*) thread:299 (ep-engine_ep_unit_tests:arm64+0x1005661f0)

Change-Id: I7fe1ed1f6ebca811a5dfca6c2e69d04bfa91b2b8
Reviewed-on: https://review.couchbase.org/c/kv_engine/+/203991
Tested-by: Pavlos Georgiou <[email protected]>
Reviewed-by: Vesko Karaganev <[email protected]>
Reviewed-by: Paolo Cocchi <[email protected]>
Well-Formed: Restriction Checker
  • Loading branch information
pavlosg committed Jan 24, 2024
1 parent c8d9921 commit 3ec11d6
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 5 deletions.
22 changes: 17 additions & 5 deletions engines/ep/src/checkpoint_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,8 @@ std::vector<Cursor> CheckpointManager::getListOfCursorsToDrop() {
? *cursors.at(backupPCursorName)
: *persistenceCursor;

getListOfCursorsToDropHook();

for (const auto& pair : cursors) {
const auto cursor = pair.second;
// Note: Strict condition here.
Expand Down Expand Up @@ -1836,13 +1838,23 @@ void CheckpointManager::addStats(const AddStatFn& add_stat,
}

void CheckpointManager::takeAndResetCursors(CheckpointManager& other) {
pCursor = other.pCursor;
persistenceCursor = pCursor.lock().get();
for (auto& cursor : other.cursors) {
cursors[cursor.second->getName()] = cursor.second;
other.takeAndResetCursorsHook();

Cursor otherPCursor;
cursor_index otherCursors;
{
std::lock_guard<std::mutex> otherLH(other.queueLock);
otherPCursor = other.pCursor;
otherCursors = std::move(other.cursors);
other.cursors.clear();
}
other.cursors.clear();

std::lock_guard<std::mutex> lh(queueLock);
pCursor = std::move(otherPCursor);
persistenceCursor = pCursor.lock().get();
for (auto& cursor : otherCursors) {
cursors[cursor.second->getName()] = std::move(cursor.second);
}
resetCursors();
}

Expand Down
8 changes: 8 additions & 0 deletions engines/ep/src/checkpoint_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,14 @@ class CheckpointManager {
// (and not yet re-acquired) the CM::lock. Introduced in MB-56644.
TestingHook<> expelHook;

/// Testing hook called at the start of CM::takeAndResetCursors.
/// Introduced in MB-59601.
TestingHook<> takeAndResetCursorsHook;

/// Testing hook called just before iterating CM::cursors in
/// CM::getListOfCursorsToDrop. Introduced in MB-59601.
TestingHook<> getListOfCursorsToDropHook;

protected:
/**
* Checks if eager checkpoint removal is enabled, then checks if the
Expand Down
65 changes: 65 additions & 0 deletions engines/ep/tests/module_tests/checkpoint_remover_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "collections/vbucket_manifest_handles.h"
#include "dcp/response.h"
#include "test_helpers.h"
#include "thread_gate.h"
#include "vbucket.h"

void CheckpointRemoverTest::SetUp() {
Expand Down Expand Up @@ -449,6 +450,70 @@ TEST_P(CheckpointRemoverTest, MemRecoveryByCheckpointCreation) {
EXPECT_EQ(0, store->getRequiredCheckpointMemoryReduction());
}

// Without the fix, there is a data race in
// CheckpointManager::takeAndResetCursors which did not take a queueLock,
// and could mutate the CheckpointManager while it is being accessed,
// e.g. in CheckpointManager::getListOfCursorsToDrop.
TEST_P(CheckpointRemoverTest, MB59601) {
if (!isPersistent()) {
GTEST_SKIP();
}

setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
auto& config = engine->getConfiguration();
config.setChkExpelEnabled(false);
config.setMaxSize(100UL * 1024 * 1024);
// Disable the mem-based checkpoint creation in this test, we would end up
// doing straight CheckpointRemoval rather than ItemExpel/CursorDrop
config.setCheckpointMaxSize(std::numeric_limits<size_t>::max());
const auto chkptMemRecoveryLimit =
config.getMaxSize() * store->getCheckpointMemoryRatio() *
store->getCheckpointMemoryRecoveryUpperMark();
auto& stats = engine->getEpStats();
stats.mem_low_wat.store(1);

int numItems = 0;
const std::string value(1024 * 1024, 'x');
while (stats.getCheckpointManagerEstimatedMemUsage() <
chkptMemRecoveryLimit) {
auto docKey = "key_" + std::to_string(++numItems);
store_item(vbid, makeStoredDocKey(docKey), value);
}
flushVBucketToDiskIfPersistent(vbid, numItems);

// VB needs to be replica to rollback
store->setVBucketState(vbid, vbucket_state_replica);

EXPECT_GT(stats.getNumCheckpoints(), 0);
EXPECT_GT(store->getRequiredCheckpointMemoryReduction(), 0);

/// Synchronises just before accessing and mutating CM::cursors
ThreadGate tg(2);
std::thread bgThread;

auto& oldManager = *store->getVBucket(vbid)->checkpointManager;
oldManager.takeAndResetCursorsHook = [this, &tg, &bgThread]() {
// Note: takeAndResetCursorsHook is executed *after* the new VBucket
// has already been created

auto& newManager = *store->getVBucket(vbid)->checkpointManager;
newManager.getListOfCursorsToDropHook = [&tg]() { tg.threadUp(); };
bgThread = std::thread([this]() {
auto remover = std::make_shared<CheckpointMemRecoveryTask>(
engine.get(),
engine->getEpStats(),
engine->getConfiguration().getChkRemoverStime(),
0);
remover->run();
});

tg.threadUp();
};

store->rollback(vbid, 0);
bgThread.join();
}

// Test written for MB-36366. With the fix removed this test failed because
// post expel, we continued onto cursor dropping.
// MB-36447 - unreliable test, disabling for now
Expand Down

0 comments on commit 3ec11d6

Please sign in to comment.