From 3d73cfe308108239073f58fa955b50bc3d8ccd7e Mon Sep 17 00:00:00 2001 From: "Vinogradov, Sergei" Date: Fri, 28 Jul 2023 13:30:09 -0400 Subject: [PATCH 1/3] Transparent Item movements The new algorithm relies on the moving bit and does not require external synchronization. Data movement happens transparently for the client: if the client thread attempts to get a handle for the item being moved it will get a handle with wait context to wait till the movement is completed. --- cachelib/allocator/CacheAllocator-inl.h | 714 ++++++++++-------- cachelib/allocator/CacheAllocator.h | 165 +++- cachelib/allocator/CacheItem.h | 2 +- cachelib/allocator/MM2Q-inl.h | 6 + cachelib/allocator/MM2Q.h | 4 + cachelib/allocator/MMLru-inl.h | 6 + cachelib/allocator/MMLru.h | 4 + cachelib/allocator/MMTinyLFU-inl.h | 7 + cachelib/allocator/MMTinyLFU.h | 4 + cachelib/allocator/Refcount.h | 36 +- .../allocator/tests/AllocatorTypeTest.cpp | 8 +- cachelib/allocator/tests/BaseAllocatorTest.h | 128 +--- cachelib/allocator/tests/RefCountTest.cpp | 10 +- cachelib/common/Mutex.h | 6 + 14 files changed, 620 insertions(+), 480 deletions(-) diff --git a/cachelib/allocator/CacheAllocator-inl.h b/cachelib/allocator/CacheAllocator-inl.h index e61e755d41..f9dfd24a30 100644 --- a/cachelib/allocator/CacheAllocator-inl.h +++ b/cachelib/allocator/CacheAllocator-inl.h @@ -81,6 +81,8 @@ CacheAllocator::CacheAllocator( config.chainedItemAccessConfig)), chainedItemLocks_(config_.chainedItemsLockPower, std::make_shared()), + movesMap_(kShards), + moveLock_(kShards), cacheCreationTime_{ type != InitMemType::kMemAttach ? util::getCurrentTimeSec() @@ -415,7 +417,7 @@ CacheAllocator::allocateChainedItem(const ReadHandle& parent, "Cannot call allocate chained item with a empty parent handle!"); } - auto it = allocateChainedItemInternal(parent, size); + auto it = allocateChainedItemInternal(*parent, size); if (auto eventTracker = getEventTracker()) { const auto result = it ? AllocatorApiResult::ALLOCATED : AllocatorApiResult::FAILED; @@ -427,8 +429,8 @@ CacheAllocator::allocateChainedItem(const ReadHandle& parent, template typename CacheAllocator::WriteHandle -CacheAllocator::allocateChainedItemInternal( - const ReadHandle& parent, uint32_t size) { +CacheAllocator::allocateChainedItemInternal(const Item& parent, + uint32_t size) { util::LatencyTracker tracker{stats().allocateLatency_}; SCOPE_FAIL { stats_.invalidAllocs.inc(); }; @@ -436,7 +438,7 @@ CacheAllocator::allocateChainedItemInternal( // number of bytes required for this item const auto requiredSize = ChainedItem::getRequiredSize(size); - const auto pid = allocator_->getAllocInfo(parent->getMemory()).poolId; + const auto pid = allocator_->getAllocInfo(parent.getMemory()).poolId; const auto cid = allocator_->getAllocationClassId(pid, requiredSize); (*stats_.allocAttempts)[pid][cid].inc(); @@ -452,9 +454,8 @@ CacheAllocator::allocateChainedItemInternal( SCOPE_FAIL { allocator_->free(memory); }; - auto child = acquire( - new (memory) ChainedItem(compressor_.compress(parent.getInternal()), size, - util::getCurrentTimeSec())); + auto child = acquire(new (memory) ChainedItem( + compressor_.compress(&parent), size, util::getCurrentTimeSec())); if (child) { child.markNascent(); @@ -494,14 +495,15 @@ void CacheAllocator::addChainedItem(WriteHandle& parent, // Count a new child stats_.numChainedChildItems.inc(); - insertInMMContainer(*child); - // Increment refcount since this chained item is now owned by the parent // Parent will decrement the refcount upon release. Since this is an // internal refcount, we dont include it in active handle tracking. - child->incRef(); + auto ret = child->incRef(); + XDCHECK(ret == RefcountWithFlags::incResult::incOk); XDCHECK_EQ(2u, child->getRefCount()); + insertInMMContainer(*child); + invalidateNvm(*parent); if (auto eventTracker = getEventTracker()) { eventTracker->record(AllocatorApiEvent::ADD_CHAINED, parent->getKey(), @@ -565,22 +567,20 @@ CacheAllocator::getParentKey(const Item& chainedItem) { } template -void CacheAllocator::transferChainLocked(WriteHandle& parent, +void CacheAllocator::transferChainLocked(Item& parent, WriteHandle& newParent) { // parent must be in a state to not have concurrent readers. Eviction code - // paths rely on holding the last item handle. Since we hold on to an item - // handle here, the chain will not be touched by any eviction code path. - XDCHECK(parent); + // paths rely on holding the last item handle. XDCHECK(newParent); - XDCHECK_EQ(parent->getKey(), newParent->getKey()); - XDCHECK(parent->hasChainedItem()); + XDCHECK_EQ(parent.getKey(), newParent->getKey()); + XDCHECK(parent.hasChainedItem()); if (newParent->hasChainedItem()) { throw std::invalid_argument(folly::sformat( "New Parent {} has invalid state", newParent->toString())); } - auto headHandle = findChainedItem(*parent); + auto headHandle = findChainedItem(parent); XDCHECK(headHandle); // remove from the access container since we are changing the key @@ -592,6 +592,7 @@ void CacheAllocator::transferChainLocked(WriteHandle& parent, while (curr) { XDCHECK_EQ(curr == headHandle.get() ? 2u : 1u, curr->getRefCount()); XDCHECK(curr->isInMMContainer()); + XDCHECK(!newParent->isMoving()); curr->changeKey(newParentPtr); curr = curr->getNext(compressor_); } @@ -603,7 +604,7 @@ void CacheAllocator::transferChainLocked(WriteHandle& parent, folly::sformat("Did not expect to find an existing chain for {}", newParent->toString(), oldHead->toString())); } - parent->unmarkHasChainedItem(); + parent.unmarkHasChainedItem(); } template @@ -614,7 +615,7 @@ void CacheAllocator::transferChainAndReplace( } { // scope for chained item lock auto l = chainedItemLocks_.lockExclusive(parent->getKey()); - transferChainLocked(parent, newParent); + transferChainLocked(*parent, newParent); } if (replaceIfAccessible(*parent, *newParent)) { @@ -681,33 +682,10 @@ CacheAllocator::replaceChainedItem(Item& oldItem, } template -typename CacheAllocator::WriteHandle -CacheAllocator::replaceChainedItemLocked(Item& oldItem, - WriteHandle newItemHdl, - const Item& parent) { - XDCHECK(newItemHdl != nullptr); - XDCHECK_GE(1u, oldItem.getRefCount()); - - // grab the handle to the old item so that we can return this. Also, we need - // to drop the refcount the parent holds on oldItem by manually calling - // decRef. To do that safely we need to have a proper outstanding handle. - auto oldItemHdl = acquire(&oldItem); - - // Replace the old chained item with new item in the MMContainer before we - // actually replace the old item in the chain - - if (!replaceChainedItemInMMContainer(oldItem, *newItemHdl)) { - // This should never happen since we currently hold an valid - // parent handle. None of its chained items can be removed - throw std::runtime_error(folly::sformat( - "chained item cannot be replaced in MM container, oldItem={}, " - "newItem={}, parent={}", - oldItem.toString(), newItemHdl->toString(), parent.toString())); - } - - XDCHECK(!oldItem.isInMMContainer()); - XDCHECK(newItemHdl->isInMMContainer()); - +void CacheAllocator::replaceInChainLocked(Item& oldItem, + WriteHandle& newItemHdl, + const Item& parent, + bool fromMove) { auto head = findChainedItem(parent); XDCHECK(head != nullptr); XDCHECK_EQ(reinterpret_cast( @@ -736,16 +714,62 @@ CacheAllocator::replaceChainedItemLocked(Item& oldItem, oldItem.asChainedItem().getNext(compressor_), compressor_); oldItem.asChainedItem().setNext(nullptr, compressor_); - // this should not result in 0 refcount. We are bumping down the internal - // refcount. If it did, we would leak an item. - oldItem.decRef(); - XDCHECK_LT(0u, oldItem.getRefCount()) << oldItem.toString(); + // if called from moveChainedItem then ref will be zero, else + // greater than 0 + if (fromMove) { + // if this is the head chained item, release the handle now + // while refCount > 1 so that the destructor does not + // call releaseBackToAllocator since we want recycle oldItem + if (head) { + head.reset(); + XDCHECK_EQ(1u, oldItem.getRefCount()); + } + oldItem.decRef(); + XDCHECK_EQ(0u, oldItem.getRefCount()) << oldItem.toString(); + } else { + oldItem.decRef(); + XDCHECK_LT(0u, oldItem.getRefCount()) << oldItem.toString(); + } // increment refcount to indicate parent owns this similar to addChainedItem // Since this is an internal refcount, we dont include it in active handle // tracking. - newItemHdl->incRef(); + auto ret = newItemHdl->incRef(); + XDCHECK(ret == RefcountWithFlags::incResult::incOk); +} + +template +typename CacheAllocator::WriteHandle +CacheAllocator::replaceChainedItemLocked(Item& oldItem, + WriteHandle newItemHdl, + const Item& parent) { + XDCHECK(newItemHdl != nullptr); + XDCHECK_GE(1u, oldItem.getRefCount()); + + // grab the handle to the old item so that we can return this. Also, we need + // to drop the refcount the parent holds on oldItem by manually calling + // decRef. To do that safely we need to have a proper outstanding handle. + auto oldItemHdl = acquire(&oldItem); + XDCHECK_GE(2u, oldItem.getRefCount()); + + // Replace the old chained item with new item in the MMContainer before we + // actually replace the old item in the chain + + if (!replaceChainedItemInMMContainer(oldItem, *newItemHdl)) { + // This should never happen since we currently hold an valid + // parent handle. None of its chained items can be removed + throw std::runtime_error(folly::sformat( + "chained item cannot be replaced in MM container, oldItem={}, " + "newItem={}, parent={}", + oldItem.toString(), newItemHdl->toString(), parent.toString())); + } + + XDCHECK(!oldItem.isInMMContainer()); + XDCHECK(newItemHdl->isInMMContainer()); + + replaceInChainLocked(oldItem, newItemHdl, parent, false /* fromMove */); + return oldItemHdl; } @@ -910,12 +934,12 @@ CacheAllocator::releaseBackToAllocator(Item& it, } template -bool CacheAllocator::incRef(Item& it) { - if (it.incRef()) { +RefcountWithFlags::incResult CacheAllocator::incRef(Item& it) { + auto ret = it.incRef(); + if (ret == RefcountWithFlags::incResult::incOk) { ++handleCount_.tlStats(); - return true; } - return false; + return ret; } template @@ -935,11 +959,20 @@ CacheAllocator::acquire(Item* it) { SCOPE_FAIL { stats_.numRefcountOverflow.inc(); }; - if (LIKELY(incRef(*it))) { - return WriteHandle{it, *this}; - } else { - // item is being evicted - return WriteHandle{}; + while (true) { + auto incRes = incRef(*it); + if (LIKELY(incRes == RefcountWithFlags::incResult::incOk)) { + return WriteHandle{it, *this}; + } else if (incRes == RefcountWithFlags::incResult::incFailedEviction) { + // item is being evicted + return WriteHandle{}; + } else { + // item is being moved - wait for completion + WriteHandle handle; + if (tryGetHandleWithWaitContextForMovingItem(*it, handle)) { + return handle; + } + } } } @@ -1127,19 +1160,72 @@ CacheAllocator::insertOrReplace(const WriteHandle& handle) { return replaced; } +/* Next two methods are used to asynchronously move Item between Slabs. + * + * The thread, which moves Item, allocates new Item in the tier we are moving to + * and calls moveRegularItem() method. This method does the following: + * 1. Update the access container with the new item from the tier we are + * moving to. This Item has moving flag set. + * 2. Copy data from the old Item to the new one. + * + * Concurrent threads which are getting handle to the same key: + * 1. When a handle is created it checks if the moving flag is set + * 2. If so, Handle implementation creates waitContext and adds it to the + * MoveCtx by calling handleWithWaitContextForMovingItem() method. + * 3. Wait until the moving thread will complete its job. + */ +template +bool CacheAllocator::tryGetHandleWithWaitContextForMovingItem( + Item& item, WriteHandle& handle) { + auto shard = getShardForKey(item.getKey()); + auto& movesMap = getMoveMapForShard(shard); + { + auto lock = getMoveLockForShard(shard); + + // item might have been evicted or moved before the lock was acquired + if (!item.isMoving()) { + return false; + } + + WriteHandle hdl{*this}; + auto waitContext = hdl.getItemWaitContext(); + + auto ret = movesMap.try_emplace(item.getKey(), std::make_unique()); + ret.first->second->addWaiter(std::move(waitContext)); + + handle = std::move(hdl); + return true; + } +} + +template +size_t CacheAllocator::wakeUpWaitersLocked(folly::StringPiece key, + WriteHandle&& handle) { + std::unique_ptr ctx; + auto shard = getShardForKey(key); + auto& movesMap = getMoveMapForShard(shard); + { + auto lock = getMoveLockForShard(shard); + movesMap.eraseInto( + key, [&](auto&& key, auto&& value) { ctx = std::move(value); }); + } + + if (ctx) { + ctx->setItemHandle(std::move(handle)); + return ctx->numWaiters(); + } + + return 0; +} + template bool CacheAllocator::moveRegularItem(Item& oldItem, WriteHandle& newItemHdl) { - XDCHECK(config_.moveCb); + XDCHECK(oldItem.isMoving()); + XDCHECK(!oldItem.isExpired()); util::LatencyTracker tracker{stats_.moveRegularLatency_}; - if (!oldItem.isAccessible() || oldItem.isExpired()) { - return false; - } - XDCHECK_EQ(newItemHdl->getSize(), oldItem.getSize()); - XDCHECK_EQ(reinterpret_cast(&getMMContainer(oldItem)), - reinterpret_cast(&getMMContainer(*newItemHdl))); // take care of the flags before we expose the item to be accessed. this // will ensure that when another thread removes the item from RAM, we issue @@ -1148,52 +1234,31 @@ bool CacheAllocator::moveRegularItem(Item& oldItem, newItemHdl->markNvmClean(); } - // Execute the move callback. We cannot make any guarantees about the - // consistency of the old item beyond this point, because the callback can - // do more than a simple memcpy() e.g. update external references. If there - // are any remaining handles to the old item, it is the caller's - // responsibility to invalidate them. The move can only fail after this - // statement if the old item has been removed or replaced, in which case it - // should be fine for it to be left in an inconsistent state. - config_.moveCb(oldItem, *newItemHdl, nullptr); - - // Inside the access container's lock, this checks if the old item is - // accessible and its refcount is zero. If the item is not accessible, - // there is no point to replace it since it had already been removed - // or in the process of being removed. If the item is in cache but the - // refcount is non-zero, it means user could be attempting to remove - // this item through an API such as remove(itemHandle). In this case, - // it is unsafe to replace the old item with a new one, so we should - // also abort. - if (!accessContainer_->replaceIf(oldItem, *newItemHdl, - itemExclusivePredicate)) { - return false; - } - - // Inside the MM container's lock, this checks if the old item exists to - // make sure that no other thread removed it, and only then replaces it. - if (!replaceInMMContainer(oldItem, *newItemHdl)) { - accessContainer_->remove(*newItemHdl); - return false; + if (config_.moveCb) { + // Execute the move callback. We cannot make any guarantees about the + // consistency of the old item beyond this point, because the callback can + // do more than a simple memcpy() e.g. update external references. If there + // are any remaining handles to the old item, it is the caller's + // responsibility to invalidate them. The move can only fail after this + // statement if the old item has been removed or replaced, in which case it + // should be fine for it to be left in an inconsistent state. + config_.moveCb(oldItem, *newItemHdl, nullptr); + } else { + std::memcpy(newItemHdl->getMemory(), oldItem.getMemory(), + oldItem.getSize()); } - // Replacing into the MM container was successful, but someone could have - // called insertOrReplace() or remove() before or after the - // replaceInMMContainer() operation, which would invalidate newItemHdl. - if (!newItemHdl->isAccessible()) { - removeFromMMContainer(*newItemHdl); - return false; - } + // Adding the item to mmContainer has to succeed since no one can remove the + // item + auto& newContainer = getMMContainer(*newItemHdl); + auto mmContainerAdded = newContainer.add(*newItemHdl); + XDCHECK(mmContainerAdded); - // no one can add or remove chained items at this point if (oldItem.hasChainedItem()) { - // safe to acquire handle for a moving Item - auto oldHandle = acquire(&oldItem); - XDCHECK_EQ(1u, oldHandle->getRefCount()) << oldHandle->toString(); XDCHECK(!newItemHdl->hasChainedItem()) << newItemHdl->toString(); try { auto l = chainedItemLocks_.lockExclusive(oldItem.getKey()); - transferChainLocked(oldHandle, newItemHdl); + transferChainLocked(oldItem, newItemHdl); } catch (const std::exception& e) { // this should never happen because we drained all the handles. XLOGF(DFATAL, "{}", e.what()); @@ -1203,6 +1268,18 @@ bool CacheAllocator::moveRegularItem(Item& oldItem, XDCHECK(!oldItem.hasChainedItem()); XDCHECK(newItemHdl->hasChainedItem()); } + + auto predicate = [&](const Item& item) { + // we rely on moving flag being set (it should block all readers) + XDCHECK_EQ(item.getRefCount(), 0); + XDCHECK(item.isMoving()); + return true; + }; + if (!accessContainer_->replaceIf(oldItem, *newItemHdl, predicate)) { + newContainer.remove(*newItemHdl); + return false; + } + newItemHdl.unmarkNascent(); return true; } @@ -1210,64 +1287,53 @@ bool CacheAllocator::moveRegularItem(Item& oldItem, template bool CacheAllocator::moveChainedItem(ChainedItem& oldItem, WriteHandle& newItemHdl) { - XDCHECK(config_.moveCb); + Item& parentItem = oldItem.getParentItem(compressor_); + XDCHECK(parentItem.isMoving()); util::LatencyTracker tracker{stats_.moveChainedLatency_}; - // This item has been unlinked from its parent and we're the only - // owner of it, so we're done here - if (!oldItem.isInMMContainer() || oldItem.isOnlyMoving()) { - return false; - } - - const auto parentKey = oldItem.getParentItem(compressor_).getKey(); - - // Grab lock to prevent anyone else from modifying the chain + const auto parentKey = parentItem.getKey(); auto l = chainedItemLocks_.lockExclusive(parentKey); - auto parentHandle = - validateAndGetParentHandleForChainedMoveLocked(oldItem, parentKey); - if (!parentHandle) { - return false; - } - - // once we have the moving sync and valid parent for the old item, check if - // the original allocation was made correctly. If not, we destroy the - // allocation to indicate a retry to moving logic above. - if (reinterpret_cast( - &newItemHdl->asChainedItem().getParentItem(compressor_)) != - reinterpret_cast(&parentHandle->asChainedItem())) { - newItemHdl.reset(); - return false; - } - XDCHECK_EQ(reinterpret_cast( &newItemHdl->asChainedItem().getParentItem(compressor_)), - reinterpret_cast(&parentHandle->asChainedItem())); - - // In case someone else had removed this chained item from its parent by now - // So we check again to see if the it has been unlinked from its parent - if (!oldItem.isInMMContainer() || oldItem.isOnlyMoving()) { - return false; + reinterpret_cast(&parentItem.asChainedItem())); + + auto parentPtr = &parentItem; + + if (config_.moveCb) { + // Execute the move callback. We cannot make any guarantees about the + // consistency of the old item beyond this point, because the callback can + // do more than a simple memcpy() e.g. update external references. If there + // are any remaining handles to the old item, it is the caller's + // responsibility to invalidate them. The move can only fail after this + // statement if the old item has been removed or replaced, in which case it + // should be fine for it to be left in an inconsistent state. + config_.moveCb(oldItem, *newItemHdl, parentPtr); + } else { + std::memcpy(newItemHdl->getMemory(), oldItem.getMemory(), + oldItem.getSize()); } - auto parentPtr = parentHandle.getInternal(); - - XDCHECK_EQ(reinterpret_cast(parentPtr), - reinterpret_cast(&oldItem.getParentItem(compressor_))); - - // Invoke the move callback to fix up any user data related to the chain - config_.moveCb(oldItem, *newItemHdl, parentPtr); - // Replace the new item in the position of the old one before both in the // parent's chain and the MMContainer. - auto oldItemHandle = - replaceChainedItemLocked(oldItem, std::move(newItemHdl), *parentHandle); - XDCHECK(oldItemHandle->isMoving()); - XDCHECK(!oldItemHandle->isInMMContainer()); + XDCHECK_EQ(parentItem.getRefCount(), 0); + auto& newContainer = getMMContainer(*newItemHdl); + auto mmContainerAdded = newContainer.add(*newItemHdl); + XDCHECK(mmContainerAdded); + + replaceInChainLocked(oldItem, newItemHdl, parentItem, true); return true; } +template +typename CacheAllocator::NvmCacheT::PutToken +CacheAllocator::createPutToken(Item& item) { + const bool evictToNvmCache = shouldWriteToNvmCache(item); + return evictToNvmCache ? nvmCache_->createPutToken(item.getKey()) + : typename NvmCacheT::PutToken{}; +} + template void CacheAllocator::unlinkItemForEviction(Item& it) { XDCHECK(it.isMarkedForEviction()); @@ -2478,6 +2544,8 @@ void CacheAllocator::releaseSlabImpl( // 3. If 2 is successful, Move or Evict // 4. Move on to the next item if current item is freed for (auto alloc : releaseContext.getActiveAllocations()) { + Item& item = *static_cast(alloc); + // Need to mark an item for release before proceeding // If we can't mark as moving, it means the item is already freed const bool isAlreadyFreed = @@ -2486,8 +2554,6 @@ void CacheAllocator::releaseSlabImpl( continue; } - Item& item = *static_cast(alloc); - // Try to move this item and make sure we can free the memory const bool isMoved = moveForSlabRelease(releaseContext, item, throttler); @@ -2509,6 +2575,15 @@ void CacheAllocator::throttleWith(util::Throttler& t, } } +template +typename RefcountWithFlags::Value +CacheAllocator::unmarkMovingAndWakeUpWaiters(Item& item, + WriteHandle handle) { + auto ret = item.unmarkMoving(); + wakeUpWaiters(item, std::move(handle)); + return ret; +} + template bool CacheAllocator::moveForSlabRelease( const SlabReleaseContext& ctx, Item& oldItem, util::Throttler& throttler) { @@ -2516,66 +2591,72 @@ bool CacheAllocator::moveForSlabRelease( return false; } - bool isMoved = false; auto startTime = util::getCurrentTimeSec(); - WriteHandle newItemHdl = allocateNewItemForOldItem(oldItem); - - for (unsigned int itemMovingAttempts = 0; - itemMovingAttempts < config_.movingTries; - ++itemMovingAttempts) { - stats_.numMoveAttempts.inc(); + Item* parentItem; + bool chainedItem = oldItem.isChainedItem(); - // Nothing to move and the key is likely also bogus for chained items. - if (oldItem.isOnlyMoving()) { - oldItem.unmarkMoving(); - const auto res = - releaseBackToAllocator(oldItem, RemoveContext::kNormal, false); - XDCHECK(res == ReleaseRes::kReleased); - return true; - } + stats_.numMoveAttempts.inc(); - if (!newItemHdl) { - // try to allocate again if it previously wasn't successful - newItemHdl = allocateNewItemForOldItem(oldItem); - } - - // if we have a valid handle, try to move, if not, we retry. - if (newItemHdl) { - isMoved = tryMovingForSlabRelease(oldItem, newItemHdl); - if (isMoved) { - break; - } - } + // Nothing to move - in the case that tryMoving failed + // for chained items we would have already evicted the entire chain. + if (oldItem.isOnlyMoving()) { + XDCHECK(!chainedItem); + auto ret = unmarkMovingAndWakeUpWaiters(oldItem, {}); + XDCHECK(ret == 0); + const auto res = + releaseBackToAllocator(oldItem, RemoveContext::kNormal, false); + XDCHECK(res == ReleaseRes::kReleased); + return true; + } - throttleWith(throttler, [&] { - XLOGF(WARN, - "Spent {} seconds, slab release still trying to move Item: {}. " - "Pool: {}, Class: {}.", - util::getCurrentTimeSec() - startTime, oldItem.toString(), - ctx.getPoolId(), ctx.getClassId()); - }); + if (chainedItem) { + parentItem = &oldItem.asChainedItem().getParentItem(compressor_); + XDCHECK(parentItem->isMoving()); + XDCHECK_EQ(1, oldItem.getRefCount()); + XDCHECK_EQ(0, parentItem->getRefCount()); + } else { + XDCHECK(oldItem.isMoving()); } + WriteHandle newItemHdl = allocateNewItemForOldItem(oldItem); - // Return false if we've exhausted moving tries. - if (!isMoved) { + // if we have a valid handle, try to move, if not, we retry. + if (newItemHdl) { + // move can fail if another thread calls insertOrReplace + // in this case oldItem is no longer valid (not accessible, + // it gets removed from MMContainer and evictForSlabRelease + // will send it back to the allocator + bool isMoved = oldItem.isChainedItem() + ? moveChainedItem(oldItem.asChainedItem(), newItemHdl) + : moveRegularItem(oldItem, newItemHdl); + if (!isMoved) { + return false; + } + removeFromMMContainer(oldItem); + } else { return false; } - // Since item has been moved, we can directly free it. We don't need to - // worry about any stats related changes, because there is another item - // that's identical to this one to replace it. Here we just need to wait - // until all users have dropped the item handles before we can proceed. - startTime = util::getCurrentTimeSec(); - while (!oldItem.isOnlyMoving()) { - throttleWith(throttler, [&] { - XLOGF(WARN, - "Spent {} seconds, slab release still waiting for refcount to " - "drain Item: {}. Pool: {}, Class: {}.", - util::getCurrentTimeSec() - startTime, oldItem.toString(), - ctx.getPoolId(), ctx.getClassId()); - }); - } const auto allocInfo = allocator_->getAllocInfo(oldItem.getMemory()); + if (chainedItem) { + newItemHdl.reset(); + auto ref = parentItem->unmarkMoving(); + if (UNLIKELY(ref == 0)) { + wakeUpWaiters(*parentItem, {}); + const auto res = + releaseBackToAllocator(*parentItem, RemoveContext::kNormal, false); + XDCHECK(res == ReleaseRes::kReleased); + return true; + } else { + XDCHECK_NE(ref, 0); + auto parentHdl = acquire(parentItem); + if (parentHdl) { + wakeUpWaiters(*parentItem, std::move(parentHdl)); + } + } + } else { + auto ref = unmarkMovingAndWakeUpWaiters(oldItem, std::move(newItemHdl)); + XDCHECK(ref == 0); + } allocator_->free(&oldItem); (*stats_.fragmentationSize)[allocInfo.poolId][allocInfo.classId].sub( @@ -2584,53 +2665,21 @@ bool CacheAllocator::moveForSlabRelease( return true; } -template -typename CacheAllocator::ReadHandle -CacheAllocator::validateAndGetParentHandleForChainedMoveLocked( - const ChainedItem& item, const Key& parentKey) { - ReadHandle parentHandle{}; - try { - parentHandle = findInternal(parentKey); - // If the parent is not the same as the parent of the chained item, - // it means someone has replaced our old parent already. So we abort. - if (!parentHandle || - parentHandle.get() != &item.getParentItem(compressor_)) { - return {}; - } - } catch (const exception::RefcountOverflow&) { - return {}; - } - - return parentHandle; -} - template typename CacheAllocator::WriteHandle CacheAllocator::allocateNewItemForOldItem(const Item& oldItem) { if (oldItem.isChainedItem()) { - const auto& oldChainedItem = oldItem.asChainedItem(); - const auto parentKey = oldChainedItem.getParentItem(compressor_).getKey(); + const Item& parentItem = oldItem.asChainedItem().getParentItem(compressor_); - // Grab lock to prevent anyone else from modifying the chain - auto l = chainedItemLocks_.lockExclusive(parentKey); - - auto parentHandle = validateAndGetParentHandleForChainedMoveLocked( - oldChainedItem, parentKey); - if (!parentHandle) { - return {}; - } - - // Set up the destination for the move. Since oldChainedItem would be - // marked as moving, it won't be picked for eviction. auto newItemHdl = - allocateChainedItemInternal(parentHandle, oldChainedItem.getSize()); + allocateChainedItemInternal(parentItem, oldItem.getSize()); if (!newItemHdl) { return {}; } + const auto& oldChainedItem = oldItem.asChainedItem(); XDCHECK_EQ(newItemHdl->getSize(), oldChainedItem.getSize()); - auto parentPtr = parentHandle.getInternal(); - XDCHECK_EQ(reinterpret_cast(parentPtr), + XDCHECK_EQ(reinterpret_cast(&parentItem), reinterpret_cast( &oldChainedItem.getParentItem(compressor_))); @@ -2660,40 +2709,8 @@ CacheAllocator::allocateNewItemForOldItem(const Item& oldItem) { } template -bool CacheAllocator::tryMovingForSlabRelease( - Item& oldItem, WriteHandle& newItemHdl) { - // By holding onto a user-level synchronization object, we ensure moving - // a regular item or chained item is synchronized with any potential - // user-side mutation. - std::unique_ptr syncObj; - if (config_.movingSync) { - if (!oldItem.isChainedItem()) { - syncObj = config_.movingSync(oldItem.getKey()); - } else { - // Copy the key so we have a valid key to work with if the chained - // item is still valid. - const std::string parentKey = - oldItem.asChainedItem().getParentItem(compressor_).getKey().str(); - if (oldItem.isOnlyMoving()) { - // If chained item no longer has a refcount, its parent is already - // being released, so we abort this try to moving. - return false; - } - syncObj = config_.movingSync(parentKey); - } - - // We need to differentiate between the following three scenarios: - // 1. nullptr indicates no move sync required for this particular item - // 2. moveSync.isValid() == true meaning we've obtained the sync - // 3. moveSync.isValid() == false meaning we need to abort and retry - if (syncObj && !syncObj->isValid()) { - return false; - } - } - - return oldItem.isChainedItem() - ? moveChainedItem(oldItem.asChainedItem(), newItemHdl) - : moveRegularItem(oldItem, newItemHdl); +void CacheAllocator::wakeUpWaiters(Item& item, WriteHandle handle) { + wakeUpWaitersLocked(item.getKey(), std::move(handle)); } template @@ -2703,60 +2720,17 @@ void CacheAllocator::evictForSlabRelease( while (true) { stats_.numEvictionAttempts.inc(); - // if the item is already in a state where only the exclusive bit is set, - // nothing needs to be done. We simply need to call unmarkMoving and free - // the item. - if (item.isOnlyMoving()) { - item.unmarkMoving(); - const auto res = - releaseBackToAllocator(item, RemoveContext::kNormal, false); - XDCHECK(ReleaseRes::kReleased == res); - return; - } - - // Since we couldn't move, we now evict this item. Owning handle will be - // the item's handle for regular/normal items and will be the parent - // handle for chained items. - auto owningHandle = - item.isChainedItem() - ? evictChainedItemForSlabRelease(item.asChainedItem()) - : evictNormalItemForSlabRelease(item); - - // we managed to evict the corresponding owner of the item and have the - // last handle for the owner. - if (owningHandle) { - const auto allocInfo = - allocator_->getAllocInfo(static_cast(&item)); - if (owningHandle->hasChainedItem()) { - (*stats_.chainedItemEvictions)[allocInfo.poolId][allocInfo.classId] - .inc(); - } else { - (*stats_.regularItemEvictions)[allocInfo.poolId][allocInfo.classId] - .inc(); - } - - stats_.numEvictionSuccesses.inc(); - - // we have the last handle. no longer need to hold on to the exclusive bit - item.unmarkMoving(); - - // manually decrement the refcount to call releaseBackToAllocator - const auto ref = decRef(*owningHandle); - XDCHECK(ref == 0); - const auto res = releaseBackToAllocator(*owningHandle.release(), - RemoveContext::kEviction, false); - XDCHECK(res == ReleaseRes::kReleased); - return; - } - if (shutDownInProgress_) { - item.unmarkMoving(); + if (item.isMoving()) { + auto ref = unmarkMovingAndWakeUpWaiters(item, {}); + } allocator_->abortSlabRelease(ctx); throw exception::SlabReleaseAborted( folly::sformat("Slab Release aborted while trying to evict" " Item: {} Pool: {}, Class: {}.", item.toString(), ctx.getPoolId(), ctx.getClassId())); } + throttleWith(throttler, [&] { XLOGF(WARN, "Spent {} seconds, slab release still trying to evict Item: {}. " @@ -2770,6 +2744,64 @@ void CacheAllocator::evictForSlabRelease( .toString()) : ""); }); + + // if the item is already in a state where only the exclusive bit is set, + // nothing needs to be done. We simply need to call unmarkMoving and free + // the item. + if (item.isOnlyMoving()) { + auto ref = unmarkMovingAndWakeUpWaiters(item, {}); + XDCHECK(ref == 0); + const auto res = + releaseBackToAllocator(item, RemoveContext::kNormal, false); + XDCHECK(ReleaseRes::kReleased == res); + return; + } + + typename NvmCacheT::PutToken token; + bool isChainedItem = item.isChainedItem(); + Item* evicted = isChainedItem + ? &item.asChainedItem().getParentItem(compressor_) + : &item; + + XDCHECK(evicted->isMoving()); + token = createPutToken(*evicted); + auto ret = evicted->markForEvictionWhenMoving(); + XDCHECK(ret); + // unmark the child so it will be freed + // TODO entire chain just gets evicted since moveForSlabRelease + // returns false + XDCHECK(!item.isMoving()); + unlinkItemForEviction(*evicted); + // wake up any readers that wait for the move to complete + // it's safe to do now, as we have the item marked exclusive and + // no other reader can be added to the waiters list + wakeUpWaiters(*evicted, {}); + + if (token.isValid() && shouldWriteToNvmCacheExclusive(*evicted)) { + nvmCache_->put(*evicted, std::move(token)); + } + + const auto allocInfo = + allocator_->getAllocInfo(static_cast(&item)); + if (evicted->hasChainedItem()) { + (*stats_.chainedItemEvictions)[allocInfo.poolId][allocInfo.classId].inc(); + } else { + (*stats_.regularItemEvictions)[allocInfo.poolId][allocInfo.classId].inc(); + } + + stats_.numEvictionSuccesses.inc(); + + XDCHECK(evicted->getRefCount() == 0); + const auto res = + releaseBackToAllocator(*evicted, RemoveContext::kEviction, false); + XDCHECK(res == ReleaseRes::kReleased); + + const bool isAlreadyFreed = + !markMovingForSlabRelease(ctx, &item, throttler); + if (!isAlreadyFreed) { + continue; + } + return; } } @@ -2945,14 +2977,47 @@ bool CacheAllocator::markMovingForSlabRelease( // At first, we assume this item was already freed bool itemFreed = true; + Item* syncItem = nullptr; bool markedMoving = false; - const auto fn = [&markedMoving, &itemFreed](void* memory) { + const auto fn = [this, &syncItem, &markedMoving, &itemFreed](void* memory) { // Since this callback is executed, the item is not yet freed itemFreed = false; Item* item = static_cast(memory); - if (item->markMoving()) { - markedMoving = true; - } + auto& mmContainer = getMMContainer(*item); + mmContainer.withContainerLock([this, &mmContainer, &syncItem, &item, + &markedMoving]() { + // we rely on the mmContainer lock to safely check that the item is + // currently in the mmContainer (no other threads are currently allocating + // this item). This is needed to sync on the case where a chained item + // is being released back to allocator and it's parent ref could be + // invalid. We need a valid parent ref in order to mark a chained item + // as moving since we sync on the parent by marking it as moving. + if (!item->isInMMContainer()) { + return; + } + bool chainedItem_ = item->isChainedItem(); + XDCHECK_EQ(&getMMContainer(*item), &mmContainer); + Item* syncItem_ = chainedItem_ + ? &item->asChainedItem().getParentItem(compressor_) + : item; + // in order to safely check if the expected parent (syncItem_) matches + // the current parent on the chained item, we need to take the chained + // item lock so we are sure that nobody else will be editing the chain + auto l_ = chainedItem_ + ? chainedItemLocks_.tryLockExclusive(syncItem_->getKey()) + : decltype(chainedItemLocks_.tryLockExclusive( + syncItem_->getKey()))(); + + if (chainedItem_ && (!l_ || &item->asChainedItem().getParentItem( + compressor_) != syncItem_)) { + markedMoving = false; + return; + } + if (syncItem_->markMoving()) { + markedMoving = true; + syncItem = syncItem_; + } + }); }; auto startTime = util::getCurrentTimeSec(); @@ -2964,6 +3029,8 @@ bool CacheAllocator::markMovingForSlabRelease( if (itemFreed) { return false; } else if (markedMoving) { + Item* item = static_cast(alloc); + XDCHECK(syncItem->isMoving()); return true; } @@ -2979,6 +3046,7 @@ bool CacheAllocator::markMovingForSlabRelease( static_cast(alloc)->toString(), ctx.getPoolId(), ctx.getClassId())); } + stats_.numMoveAttempts.inc(); throttleWith(throttler, [&] { XLOGF(WARN, "Spent {} seconds, slab release still trying to mark as moving for " diff --git a/cachelib/allocator/CacheAllocator.h b/cachelib/allocator/CacheAllocator.h index 6888e2fd41..95a175f9ba 100644 --- a/cachelib/allocator/CacheAllocator.h +++ b/cachelib/allocator/CacheAllocator.h @@ -1349,7 +1349,7 @@ class CacheAllocator : public CacheBase { private: // wrapper around Item's refcount and active handle tracking - FOLLY_ALWAYS_INLINE bool incRef(Item& it); + FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef(Item& it); FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef(Item& it); // drops the refcount and if needed, frees the allocation back to the memory @@ -1473,26 +1473,13 @@ class CacheAllocator : public CacheBase { // The parent handle parameter here is mainly used to find the // correct pool to allocate memory for this chained item // - // @param parent handle to the cache item + // @param parent the parent item // @param size the size for the chained allocation // // @return handle to the chained allocation // @throw std::invalid_argument if the size requested is invalid or // if the item is invalid - WriteHandle allocateChainedItemInternal(const ReadHandle& parent, - uint32_t size); - - // Given an item and its parentKey, validate that the parentKey - // corresponds to an item that's the parent of the supplied item. - // - // @param item item that we want to get the parent handle for - // @param parentKey key of the item's parent - // - // @return handle to the parent item if the validations pass - // otherwise, an empty Handle is returned. - // - ReadHandle validateAndGetParentHandleForChainedMoveLocked( - const ChainedItem& item, const Key& parentKey); + WriteHandle allocateChainedItemInternal(const Item& parent, uint32_t size); // Given an existing item, allocate a new one for the // existing one to later be moved into. @@ -1609,7 +1596,7 @@ class CacheAllocator : public CacheBase { // @param newParent the new parent for the chain // // @throw if any of the conditions for parent or newParent are not met. - void transferChainLocked(WriteHandle& parent, WriteHandle& newParent); + void transferChainLocked(Item& parent, WriteHandle& newParent); // replace a chained item in the existing chain. This needs to be called // with the chained item lock held exclusive @@ -1623,6 +1610,24 @@ class CacheAllocator : public CacheBase { WriteHandle newItemHdl, const Item& parent); + // + // Performs the actual inplace replace - it is called from + // moveChainedItem and replaceChainedItemLocked + // must hold chainedItemLock + // + // @param oldItem the item we are replacing in the chain + // @param newItem the item we are replacing it with + // @param parent the parent for the chain + // @param fromMove used to determine if the replaced was called from + // moveChainedItem - we avoid the handle destructor + // in this case. + // + // @return handle to the oldItem + void replaceInChainLocked(Item& oldItem, + WriteHandle& newItemHdl, + const Item& parent, + bool fromMove); + // Insert an item into MM container. The caller must hold a valid handle for // the item. // @@ -1731,6 +1736,19 @@ class CacheAllocator : public CacheBase { using EvictionIterator = typename MMContainer::LockedIterator; + // Wakes up waiters if there are any + // + // @param item wakes waiters that are waiting on that item + // @param handle handle to pass to the waiters + void wakeUpWaiters(Item& item, WriteHandle handle); + + // Unmarks item as moving and wakes up any waiters waiting on that item + // + // @param item wakes waiters that are waiting on that item + // @param handle handle to pass to the waiters + typename RefcountWithFlags::Value unmarkMovingAndWakeUpWaiters( + Item& item, WriteHandle handle); + // Deserializer CacheAllocatorMetadata and verify the version // // @param deserializer Deserializer object @@ -1824,16 +1842,6 @@ class CacheAllocator : public CacheBase { Item& item, util::Throttler& throttler); - // "Move" (by copying) the content in this item to another memory - // location by invoking the move callback. - // - // @param item old item to be moved elsewhere - // @param newItemHdl handle of new item to be moved into - // - // @return true if the item has been moved - // false if we have exhausted moving attempts - bool tryMovingForSlabRelease(Item& item, WriteHandle& newItemHdl); - // Evict an item from access and mm containers and // ensure it is safe for freeing. // @@ -1844,6 +1852,11 @@ class CacheAllocator : public CacheBase { Item& item, util::Throttler& throttler); + // Helper function to create PutToken + // + // @return valid token if the item should be written to NVM cache. + typename NvmCacheT::PutToken createPutToken(Item& item); + // Helper function to evict a normal item for slab release // // @return last handle for corresponding to item on success. empty handle on @@ -2082,6 +2095,88 @@ class CacheAllocator : public CacheBase { // BEGIN private members + bool tryGetHandleWithWaitContextForMovingItem(Item& item, + WriteHandle& handle); + + size_t wakeUpWaitersLocked(folly::StringPiece key, WriteHandle&& handle); + + class MoveCtx { + public: + MoveCtx() {} + + ~MoveCtx() { + // prevent any further enqueue to waiters + // Note: we don't need to hold locks since no one can enqueue + // after this point. + wakeUpWaiters(); + } + + // record the item handle. Upon destruction we will wake up the waiters + // and pass a clone of the handle to the callBack. By default we pass + // a null handle + void setItemHandle(WriteHandle _it) { it = std::move(_it); } + + // enqueue a waiter into the waiter list + // @param waiter WaitContext + void addWaiter(std::shared_ptr> waiter) { + XDCHECK(waiter); + waiters.push_back(std::move(waiter)); + } + + size_t numWaiters() const { return waiters.size(); } + + private: + // notify all pending waiters that are waiting for the fetch. + void wakeUpWaiters() { + bool refcountOverflowed = false; + for (auto& w : waiters) { + // If refcount overflowed earlier, then we will return miss to + // all subsequent waiters. + if (refcountOverflowed) { + w->set(WriteHandle{}); + continue; + } + + try { + w->set(it.clone()); + } catch (const exception::RefcountOverflow&) { + // We'll return a miss to the user's pending read, + // so we should enqueue a delete via NvmCache. + // TODO: cache.remove(it); + refcountOverflowed = true; + } + } + } + + WriteHandle it; // will be set when Context is being filled + std::vector>> waiters; // list of + // waiters + }; + using MoveMap = + folly::F14ValueMap, + folly::HeterogeneousAccessHash>; + + static size_t getShardForKey(folly::StringPiece key) { + return folly::Hash()(key) % kShards; + } + + MoveMap& getMoveMapForShard(size_t shard) { + return movesMap_[shard].movesMap_; + } + + MoveMap& getMoveMap(folly::StringPiece key) { + return getMoveMapForShard(getShardForKey(key)); + } + + std::unique_lock getMoveLockForShard(size_t shard) { + return std::unique_lock(moveLock_[shard].moveLock_); + } + + std::unique_lock getMoveLock(folly::StringPiece key) { + return getMoveLockForShard(getShardForKey(key)); + } + // Whether the memory allocator for this cache allocator was created on shared // memory. The hash table, chained item hash table etc is also created on // shared memory except for temporary shared memory mode when they're created @@ -2175,6 +2270,22 @@ class CacheAllocator : public CacheBase { // poolResizer_, poolOptimizer_, memMonitor_, reaper_ mutable std::mutex workersMutex_; + static constexpr size_t kShards = 8192; // TODO: need to define right value + + struct MovesMapShard { + alignas(folly::hardware_destructive_interference_size) MoveMap movesMap_; + }; + + struct MoveLock { + alignas(folly::hardware_destructive_interference_size) std::mutex moveLock_; + }; + + // a map of all pending moves + std::vector movesMap_; + + // a map of move locks for each shard + std::vector moveLock_; + // time when the ram cache was first created const uint32_t cacheCreationTime_{0}; diff --git a/cachelib/allocator/CacheItem.h b/cachelib/allocator/CacheItem.h index afee315cbb..4c32ece794 100644 --- a/cachelib/allocator/CacheItem.h +++ b/cachelib/allocator/CacheItem.h @@ -309,7 +309,7 @@ class CACHELIB_PACKED_ATTR CacheItem { // // @return true on success, failure if item is marked as exclusive // @throw exception::RefcountOverflow on ref count overflow - FOLLY_ALWAYS_INLINE bool incRef() { + FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef() { try { return ref_.incRef(); } catch (exception::RefcountOverflow& e) { diff --git a/cachelib/allocator/MM2Q-inl.h b/cachelib/allocator/MM2Q-inl.h index ba388d40a4..27f6ee3578 100644 --- a/cachelib/allocator/MM2Q-inl.h +++ b/cachelib/allocator/MM2Q-inl.h @@ -258,6 +258,12 @@ void MM2Q::Container::withEvictionIterator(F&& fun) { } } +template T::*HookPtr> +template +void MM2Q::Container::withContainerLock(F&& fun) { + lruMutex_->lock_combine([this, &fun]() { fun(); }); +} + template T::*HookPtr> void MM2Q::Container::removeLocked(T& node, bool doRebalance) noexcept { diff --git a/cachelib/allocator/MM2Q.h b/cachelib/allocator/MM2Q.h index 982eca21f9..69a53963a2 100644 --- a/cachelib/allocator/MM2Q.h +++ b/cachelib/allocator/MM2Q.h @@ -502,6 +502,10 @@ class MM2Q { template void withEvictionIterator(F&& f); + // Execute provided function under container lock. + template + void withContainerLock(F&& f); + // get the current config as a copy Config getConfig() const; diff --git a/cachelib/allocator/MMLru-inl.h b/cachelib/allocator/MMLru-inl.h index d35759f212..e0ebeafc46 100644 --- a/cachelib/allocator/MMLru-inl.h +++ b/cachelib/allocator/MMLru-inl.h @@ -229,6 +229,12 @@ void MMLru::Container::withEvictionIterator(F&& fun) { } } +template T::*HookPtr> +template +void MMLru::Container::withContainerLock(F&& fun) { + lruMutex_->lock_combine([this, &fun]() { fun(); }); +} + template T::*HookPtr> void MMLru::Container::ensureNotInsertionPoint(T& node) noexcept { // If we are removing the insertion point node, grow tail before we remove diff --git a/cachelib/allocator/MMLru.h b/cachelib/allocator/MMLru.h index 29c6d02689..42c8a38506 100644 --- a/cachelib/allocator/MMLru.h +++ b/cachelib/allocator/MMLru.h @@ -376,6 +376,10 @@ class MMLru { template void withEvictionIterator(F&& f); + // Execute provided function under container lock. + template + void withContainerLock(F&& f); + // get copy of current config Config getConfig() const; diff --git a/cachelib/allocator/MMTinyLFU-inl.h b/cachelib/allocator/MMTinyLFU-inl.h index 46640b24ca..597ce8de70 100644 --- a/cachelib/allocator/MMTinyLFU-inl.h +++ b/cachelib/allocator/MMTinyLFU-inl.h @@ -227,6 +227,13 @@ void MMTinyLFU::Container::withEvictionIterator(F&& fun) { fun(getEvictionIterator()); } +template T::*HookPtr> +template +void MMTinyLFU::Container::withContainerLock(F&& fun) { + LockHolder l(lruMutex_); + fun(); +} + template T::*HookPtr> void MMTinyLFU::Container::removeLocked(T& node) noexcept { if (isTiny(node)) { diff --git a/cachelib/allocator/MMTinyLFU.h b/cachelib/allocator/MMTinyLFU.h index c8f2699264..81099352d7 100644 --- a/cachelib/allocator/MMTinyLFU.h +++ b/cachelib/allocator/MMTinyLFU.h @@ -497,6 +497,10 @@ class MMTinyLFU { template void withEvictionIterator(F&& f); + // Execute provided function under container lock. + template + void withContainerLock(F&& f); + // for saving the state of the lru // // precondition: serialization must happen without any reader or writer diff --git a/cachelib/allocator/Refcount.h b/cachelib/allocator/Refcount.h index 107e10735e..1288dd1ed3 100644 --- a/cachelib/allocator/Refcount.h +++ b/cachelib/allocator/Refcount.h @@ -130,30 +130,40 @@ class FOLLY_PACK_ATTR RefcountWithFlags { RefcountWithFlags& operator=(const RefcountWithFlags&) = delete; RefcountWithFlags(RefcountWithFlags&&) = delete; RefcountWithFlags& operator=(RefcountWithFlags&&) = delete; - + enum incResult { incOk, incFailedMoving, incFailedEviction }; // Bumps up the reference count only if the new count will be strictly less // than or equal to the maxCount and the item is not exclusive - // @return true if refcount is bumped. false otherwise (if item is exclusive) + // @return incResult::incOk if refcount is bumped. The refcount is not bumped + // if Exclusive bit is set and appropriate error code is returned. + // incResult::incFailedMoving if item is moving (exclusive bit is set + // and refcount > 0). + // incResult::incFailedEviction if Item is evicted + // (only exclusive bit is set). // @throw exception::RefcountOverflow if new count would be greater than // maxCount - FOLLY_ALWAYS_INLINE bool incRef() { - auto predicate = [](const Value curValue) { + FOLLY_ALWAYS_INLINE incResult incRef() { + incResult res = incOk; + auto predicate = [&res](const Value curValue) { Value bitMask = getAdminRef(); const bool exlusiveBitIsSet = curValue & bitMask; if (UNLIKELY((curValue & kAccessRefMask) == (kAccessRefMask))) { throw exception::RefcountOverflow("Refcount maxed out."); + } else if (exlusiveBitIsSet) { + res = (curValue & kAccessRefMask) == 0 ? incFailedEviction + : incFailedMoving; + return false; } - - // Check if the item is not marked for eviction - return !exlusiveBitIsSet || ((curValue & kAccessRefMask) != 0); + res = incOk; + return true; }; auto newValue = [](const Value curValue) { return (curValue + static_cast(1)); }; - return atomicUpdateValue(predicate, newValue); + atomicUpdateValue(predicate, newValue); + return res; } // Bumps down the reference count @@ -322,11 +332,19 @@ class FOLLY_PACK_ATTR RefcountWithFlags { bool markMoving() { Value linkedBitMask = getAdminRef(); Value exclusiveBitMask = getAdminRef(); + Value isChainedItemFlag = getFlag(); - auto predicate = [linkedBitMask, exclusiveBitMask](const Value curValue) { + auto predicate = [linkedBitMask, exclusiveBitMask, + isChainedItemFlag](const Value curValue) { const bool unlinked = !(curValue & linkedBitMask); const bool alreadyExclusive = curValue & exclusiveBitMask; + const bool isChained = curValue & isChainedItemFlag; + // chained item can have ref count == 1, this just means it's linked in + // the chain + if ((curValue & kAccessRefMask) > isChained ? 1 : 0) { + return false; + } if (unlinked || alreadyExclusive) { return false; } diff --git a/cachelib/allocator/tests/AllocatorTypeTest.cpp b/cachelib/allocator/tests/AllocatorTypeTest.cpp index 1e98af29f2..2786946bc6 100644 --- a/cachelib/allocator/tests/AllocatorTypeTest.cpp +++ b/cachelib/allocator/tests/AllocatorTypeTest.cpp @@ -288,8 +288,8 @@ TYPED_TEST(BaseAllocatorTest, AddChainedItemMultiThreadWithMovingAndSync) { this->testAddChainedItemMultithreadWithMovingAndSync(); } -TYPED_TEST(BaseAllocatorTest, TransferChainWhileMoving) { - this->testTransferChainWhileMoving(); +TYPED_TEST(BaseAllocatorTest, TransferChainAfterMoving) { + this->testTransferChainAfterMoving(); } TYPED_TEST(BaseAllocatorTest, AddAndPopChainedItemMultithread) { @@ -325,10 +325,6 @@ TYPED_TEST(BaseAllocatorTest, ReplaceChainedItem) { this->testReplaceChainedItem(); } -TYPED_TEST(BaseAllocatorTest, MovingSyncCorrectness) { - this->testMovingSyncCorrectness(); -} - TYPED_TEST(BaseAllocatorTest, StatsChainCount) { this->testAllocChainedCount(); } diff --git a/cachelib/allocator/tests/BaseAllocatorTest.h b/cachelib/allocator/tests/BaseAllocatorTest.h index f503d59f61..4b25d0c5d3 100644 --- a/cachelib/allocator/tests/BaseAllocatorTest.h +++ b/cachelib/allocator/tests/BaseAllocatorTest.h @@ -3655,6 +3655,16 @@ class BaseAllocatorTest : public AllocatorTest { sourceAlloc); otherThread.join(); + // in our new version with marking item as moving, move attempts + // will only fail if there is a concurrent set to that item, in + // this case if the handle to an item is held, the slab release + // will keep trying to mark the item as moving - we currently + // don't have a counter for that (but this test assumes that + // if handle is held then moveForSlabRelease will retry, + // that is where the move attempts counter is incremented) + // + // as a fix, we increment the move attempts counter during + // markMovingForSlabRelase too XLOG(INFO, "Number of move retry attempts: ", allocator.getSlabReleaseStats().numMoveAttempts); ASSERT_GT(allocator.getSlabReleaseStats().numMoveAttempts, 1); @@ -4940,65 +4950,6 @@ class BaseAllocatorTest : public AllocatorTest { lookupFn("yolo"); } - // Allocate 3 items, which require - // 1. no sync - // 2. sync - // 3. sync but our sync function will fail - // What this test should see is that: - // 1. is moved - // 2. is moved - // 3. is evicted - void testMovingSyncCorrectness() { - // create an allocator worth 10 slabs. - typename AllocatorT::Config config; - - // allocate enough size to make sure evictions never occur - config.setCacheSize(200 * Slab::kSize); - - using Item = typename AllocatorT::Item; - struct TestSyncObj : public AllocatorT::SyncObj { - bool isValid_; - bool isValid() const override { return isValid_; } - - static std::unique_ptr genSync( - folly::StringPiece key) { - std::unique_ptr sync(new TestSyncObj()); - if (key == "one") { - return nullptr; - } else if (key == "two") { - sync->isValid_ = true; - } else if (key == "three") { - sync->isValid_ = false; - } else { - XDCHECK(false); - } - return sync; - } - }; - config.enableMovingOnSlabRelease( - [](Item&, Item&, Item*) {}, - [](typename Item::Key key) { return TestSyncObj::genSync(key); }); - - AllocatorT alloc(config); - const size_t numBytes = alloc.getCacheMemoryStats().ramCacheSize; - const auto poolSize = numBytes; - const auto pid = alloc.addPool("one", poolSize); - - // Asking for value size of 0 so we can the smallest allocation class - ASSERT_NE(nullptr, util::allocateAccessible(alloc, pid, "one", 0)); - ASSERT_NE(nullptr, util::allocateAccessible(alloc, pid, "two", 0)); - ASSERT_NE(nullptr, util::allocateAccessible(alloc, pid, "three", 0)); - - // Fisrt allocation class is the smallest allocation class - alloc.releaseSlab(pid, 0, SlabReleaseMode::kRebalance); - - // Now we should still see one and two, but three should be evicted - // already - ASSERT_NE(nullptr, alloc.find("one")); - ASSERT_NE(nullptr, alloc.find("two")); - ASSERT_EQ(nullptr, alloc.find("three")); - } - // This test first writes 50 bytes into each chained item // Then it saves a pointer to each chained item's memory into a vector // @@ -5052,6 +5003,7 @@ class BaseAllocatorTest : public AllocatorTest { auto allocFn = [&](std::string keyPrefix, std::vector sizes) { for (unsigned int loop = 0; loop < 10; ++loop) { std::vector bufList; + std::vector parentHandles; std::unique_lock l(m); for (unsigned int i = 0; i < 1000; ++i) { const auto key = keyPrefix + folly::to(loop) + "_" + @@ -5073,6 +5025,7 @@ class BaseAllocatorTest : public AllocatorTest { alloc.addChainedItem(itemHandle, std::move(childItem)); } + parentHandles.push_back(std::move(itemHandle)); } // Without sync object, we could be writing to already freed @@ -5156,9 +5109,10 @@ class BaseAllocatorTest : public AllocatorTest { lookupFn("yolo"); } - // while a chained item could be moved, try to transfer its parent and - // validate that move succeeds correctly. - void testTransferChainWhileMoving() { + // while a chained item could be moved - it is sync on parent moving bit. + // try to transfer its parent after we moved and + // validate that transfer succeeds correctly. + void testTransferChainAfterMoving() { // create an allocator worth 10 slabs. typename AllocatorT::Config config; config.configureChainedItems(); @@ -5173,36 +5127,6 @@ class BaseAllocatorTest : public AllocatorTest { [&](const typename AllocatorT::RemoveCbData&) { ++numRemovedKeys; }); std::string movingKey = "helloworldmoving"; - // we will use the acquisition of mutex as an indicator of whether item is - // close to being moved and use it to swap the parent. - std::mutex m; - struct TestSyncObj : public AllocatorT::SyncObj { - TestSyncObj(std::mutex& m, - std::atomic& firstTime, - folly::Baton<>& startedMoving, - folly::Baton<>& changedParent) - : l(m) { - if (!firstTime) { - return; - } - firstTime = false; - startedMoving.post(); - changedParent.wait(); - } - - std::lock_guard l; - }; - - // used to track if the moving sync is executed upon the first time after - // allocation so that the baton logic is executed only once. - std::atomic firstTimeMovingSync{true}; - - // baton to indicate that the move process has started so that we can - // switch the parent - folly::Baton<> startedMoving; - // baton to indicate that the parent has been switched so that the move - // process can proceed - folly::Baton<> changedParent; const size_t numMovingAttempts = 100; std::atomic numMoves{0}; @@ -5214,12 +5138,7 @@ class BaseAllocatorTest : public AllocatorTest { oldItem.getSize()); ++numMoves; }, - [&m, &startedMoving, &changedParent, - &firstTimeMovingSync](typename Item::Key key) { - XLOG(ERR) << "Moving" << key; - return std::make_unique(m, firstTimeMovingSync, - startedMoving, changedParent); - }, + {}, numMovingAttempts); AllocatorT alloc(config); @@ -5247,25 +5166,20 @@ class BaseAllocatorTest : public AllocatorTest { auto slabRelease = std::async(releaseFn); - startedMoving.wait(); + // wait for slab release to complete. + slabRelease.wait(); // we know moving sync is held now. { auto newParent = alloc.allocate(pid, movingKey, 600); + // parent is marked moving during moved, once finished we will get handle auto parent = alloc.findToWrite(movingKey); alloc.transferChainAndReplace(parent, newParent); } - // indicate that we changed the parent. This should abort the current - // moving attempt, re-allocate the item and eventually succeed in moving. - changedParent.post(); - - // wait for slab release to complete. - slabRelease.wait(); - EXPECT_EQ(numMoves, 1); auto slabReleaseStats = alloc.getSlabReleaseStats(); - EXPECT_EQ(slabReleaseStats.numMoveAttempts, 2); + EXPECT_EQ(slabReleaseStats.numMoveAttempts, 1); EXPECT_EQ(slabReleaseStats.numMoveSuccesses, 1); auto handle = alloc.find(movingKey); diff --git a/cachelib/allocator/tests/RefCountTest.cpp b/cachelib/allocator/tests/RefCountTest.cpp index 1f31894ddc..9b5d06e958 100644 --- a/cachelib/allocator/tests/RefCountTest.cpp +++ b/cachelib/allocator/tests/RefCountTest.cpp @@ -101,7 +101,7 @@ void RefCountTest::testBasic() { ASSERT_FALSE(ref.template isFlagSet()); for (uint32_t i = 0; i < RefcountWithFlags::kAccessRefMask; i++) { - ASSERT_TRUE(ref.incRef()); + ASSERT_EQ(ref.incRef(), RefcountWithFlags::incOk); } // Incrementing past the max will fail @@ -215,17 +215,13 @@ void RefCountTest::testMarkForEvictionAndMoving() { } { - // can mark moving when ref count > 0 + // cannot mark moving when ref count > 0 RefcountWithFlags ref; ref.markInMMContainer(); ref.incRef(); - ASSERT_TRUE(ref.markMoving()); - - ref.unmarkInMMContainer(); - auto ret = ref.unmarkMoving(); - ASSERT_EQ(ret, 1); + ASSERT_FALSE(ref.markMoving()); } { diff --git a/cachelib/common/Mutex.h b/cachelib/common/Mutex.h index 1d6e5898f1..15b440d406 100644 --- a/cachelib/common/Mutex.h +++ b/cachelib/common/Mutex.h @@ -341,6 +341,7 @@ class RWBucketLocks : public BaseBucketLocks { using Lock = LockType; using ReadLockHolder = ReadLockHolderType; using WriteLockHolder = WriteLockHolderType; + using LockHolder = std::unique_lock; RWBucketLocks(uint32_t locksPower, std::shared_ptr hasher) : Base::BaseBucketLocks(locksPower, std::move(hasher)) {} @@ -357,6 +358,11 @@ class RWBucketLocks : public BaseBucketLocks { return WriteLockHolder{Base::getLock(args...)}; } + template + LockHolder tryLockExclusive(Args... args) noexcept { + return LockHolder(Base::getLock(args...), std::try_to_lock); + } + // try to grab the reader lock for a limit _timeout_ duration template ReadLockHolder lockShared(const std::chrono::microseconds& timeout, From c572aa21d2cf3890706c55222696bd9e10d944d2 Mon Sep 17 00:00:00 2001 From: Daniel Byrne Date: Wed, 8 Nov 2023 07:10:49 -0800 Subject: [PATCH 2/3] test case for acquire parent --- cachelib/allocator/CacheAllocator-inl.h | 4 +- cachelib/allocator/Handle.h | 5 +- .../allocator/tests/AllocatorTypeTest.cpp | 5 ++ cachelib/allocator/tests/BaseAllocatorTest.h | 75 +++++++++++++++++++ cachelib/common/Exceptions.h | 5 ++ 5 files changed, 91 insertions(+), 3 deletions(-) diff --git a/cachelib/allocator/CacheAllocator-inl.h b/cachelib/allocator/CacheAllocator-inl.h index f9dfd24a30..84110a2048 100644 --- a/cachelib/allocator/CacheAllocator-inl.h +++ b/cachelib/allocator/CacheAllocator-inl.h @@ -862,13 +862,13 @@ CacheAllocator::releaseBackToAllocator(Item& it, headHandle.reset(); if (head == nullptr || &head->getParentItem(compressor_) != &it) { - throw std::runtime_error(folly::sformat( + throw exception::ChainedItemInvalid(folly::sformat( "Mismatch parent pointer. This should not happen. Key: {}", it.getKey())); } if (!chainedItemAccessContainer_->remove(*head)) { - throw std::runtime_error(folly::sformat( + throw exception::ChainedItemInvalid(folly::sformat( "Chained item associated with {} cannot be removed from hash table " "This should not happen here.", it.getKey())); diff --git a/cachelib/allocator/Handle.h b/cachelib/allocator/Handle.h index 11d2bed2be..9520b70c99 100644 --- a/cachelib/allocator/Handle.h +++ b/cachelib/allocator/Handle.h @@ -26,6 +26,7 @@ #include #include "cachelib/allocator/nvmcache/WaitContext.h" +#include "cachelib/common/Exceptions.h" namespace facebook { namespace cachelib { @@ -70,8 +71,10 @@ struct ReadHandleImpl { assert(alloc_ != nullptr); try { alloc_->release(it_, isNascent()); + } catch (const exception::ChainedItemInvalid& e) { + XDCHECK(false) << e.what(); } catch (const std::exception& e) { - XLOGF(CRITICAL, "Failed to release {:#10x} : {}", static_cast(it_), + XLOGF(CRITICAL, "Failed to release {} : {}", static_cast(it_), e.what()); } it_ = nullptr; diff --git a/cachelib/allocator/tests/AllocatorTypeTest.cpp b/cachelib/allocator/tests/AllocatorTypeTest.cpp index 2786946bc6..97ff04efea 100644 --- a/cachelib/allocator/tests/AllocatorTypeTest.cpp +++ b/cachelib/allocator/tests/AllocatorTypeTest.cpp @@ -292,6 +292,11 @@ TYPED_TEST(BaseAllocatorTest, TransferChainAfterMoving) { this->testTransferChainAfterMoving(); } +TYPED_TEST(BaseAllocatorTest, ChainedItemParentAcquireAfterMove) { + ASSERT_EXIT(this->testChainedItemParentAcquireAfterMoveLoop(), + testing::ExitedWithCode(0), ".*"); +} + TYPED_TEST(BaseAllocatorTest, AddAndPopChainedItemMultithread) { this->testAddAndPopChainedItemMultithread(); } diff --git a/cachelib/allocator/tests/BaseAllocatorTest.h b/cachelib/allocator/tests/BaseAllocatorTest.h index 4b25d0c5d3..f9dc647cc9 100644 --- a/cachelib/allocator/tests/BaseAllocatorTest.h +++ b/cachelib/allocator/tests/BaseAllocatorTest.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -2469,6 +2470,80 @@ class BaseAllocatorTest : public AllocatorTest { } } + // tests case that correct parent item is acquired after move + void testChainedItemParentAcquireAfterMoveLoop() { + // create an allocator worth 250 slabs + // first slab is for overhead, second is parent class + // third is chained item 1 and rest are for new chained item alloc + // to move to. + std::unique_ptr alloc; + typename AllocatorT::Config config; + config.configureChainedItems(); + config.setCacheSize(250 * Slab::kSize); + + const std::set allocSizes = {1024, 2048}; + auto sizes = std::vector{500, 1500}; + std::atomic numMoves{0}; + std::atomic numReplaces{0}; + PoolId pid; + + using Item = typename AllocatorT::Item; + config.enableMovingOnSlabRelease([&](Item& oldItem, Item& newItem, + Item* parentPtr) { + assert(oldItem.getSize() == newItem.getSize()); + assert(oldItem.isChainedItem()); + std::memcpy(newItem.getMemory(), oldItem.getMemory(), oldItem.getSize()); + folly::Latch latch(1); + auto insertThread = std::make_unique([&]() { + ASSERT_NO_THROW({ + auto parentReplacement = + alloc->allocate(pid, parentPtr->getKey(), sizes[0]); + Item* parentCopy = parentPtr; + latch.count_down(); + while (parentCopy->isMoving()) + ; + alloc->insertOrReplace(parentReplacement); + ++numReplaces; + }); + }); + insertThread->detach(); + latch.wait(); + ++numMoves; + }); + + alloc = std::make_unique(config); + + const size_t numBytes = alloc->getCacheMemoryStats().ramCacheSize; + const auto poolSize = numBytes; + pid = alloc->addPool("one", poolSize, allocSizes); + + auto allocFn = [&](std::string keyPrefix, std::vector sizes) { + for (unsigned int loop = 0; loop < 20; ++loop) { + for (unsigned int i = 0; i < 2048; ++i) { + const auto key = keyPrefix + folly::to(loop) + "_" + + folly::to(i); + auto itemHandle = + util::allocateAccessible(*alloc, pid, key, sizes[0]); + auto childItem = alloc->allocateChainedItem(itemHandle, sizes[1]); + ASSERT_NE(nullptr, childItem); + + alloc->addChainedItem(itemHandle, std::move(childItem)); + } + } + }; + allocFn(std::string{"yolo"}, sizes); + + ClassId cid = static_cast(1); + for (int i = 0; i < 20; i++) { + alloc->releaseSlab(pid, cid, SlabReleaseMode::kRebalance); + } + while (alloc->getSlabReleaseStats().numSlabReleaseForRebalance < 20) { + sleep(1); + } + // for ASSERT_EXIT + exit(0); + } + // create a chain of allocations, replace the allocation and ensure that the // order is preserved. void testChainedAllocsReplaceInChain() { diff --git a/cachelib/common/Exceptions.h b/cachelib/common/Exceptions.h index 02e62a9af3..9239938805 100644 --- a/cachelib/common/Exceptions.h +++ b/cachelib/common/Exceptions.h @@ -67,6 +67,11 @@ class SlabReleaseAborted : public std::runtime_error { using std::runtime_error::runtime_error; }; +class ChainedItemInvalid : public std::runtime_error { + public: + using std::runtime_error::runtime_error; +}; + // An allocation error. This could be a genuine std::bad_alloc from // the global allocator, or it can be an internal allocation error // from the backing cachelib item. From fb8e23c996fb1b61818da4022723535f6dae6486 Mon Sep 17 00:00:00 2001 From: Daniel Byrne Date: Wed, 8 Nov 2023 07:56:19 -0800 Subject: [PATCH 3/3] use findInternal to sync on ac lock --- cachelib/allocator/CacheAllocator-inl.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cachelib/allocator/CacheAllocator-inl.h b/cachelib/allocator/CacheAllocator-inl.h index 84110a2048..cebc70f4a9 100644 --- a/cachelib/allocator/CacheAllocator-inl.h +++ b/cachelib/allocator/CacheAllocator-inl.h @@ -2639,6 +2639,7 @@ bool CacheAllocator::moveForSlabRelease( const auto allocInfo = allocator_->getAllocInfo(oldItem.getMemory()); if (chainedItem) { newItemHdl.reset(); + auto parentKey = parentItem->getKey(); auto ref = parentItem->unmarkMoving(); if (UNLIKELY(ref == 0)) { wakeUpWaiters(*parentItem, {}); @@ -2648,7 +2649,7 @@ bool CacheAllocator::moveForSlabRelease( return true; } else { XDCHECK_NE(ref, 0); - auto parentHdl = acquire(parentItem); + auto parentHdl = findInternal(parentKey); if (parentHdl) { wakeUpWaiters(*parentItem, std::move(parentHdl)); }