@@ -1349,7 +1349,7 @@ class CacheAllocator : public CacheBase {
1349
1349
1350
1350
private:
1351
1351
// wrapper around Item's refcount and active handle tracking
1352
- FOLLY_ALWAYS_INLINE bool incRef (Item& it);
1352
+ FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef (Item& it);
1353
1353
FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef (Item& it);
1354
1354
1355
1355
// drops the refcount and if needed, frees the allocation back to the memory
@@ -1473,13 +1473,13 @@ class CacheAllocator : public CacheBase {
1473
1473
// The parent handle parameter here is mainly used to find the
1474
1474
// correct pool to allocate memory for this chained item
1475
1475
//
1476
- // @param parent handle to the cache item
1476
+ // @param parent the parent item
1477
1477
// @param size the size for the chained allocation
1478
1478
//
1479
1479
// @return handle to the chained allocation
1480
1480
// @throw std::invalid_argument if the size requested is invalid or
1481
1481
// if the item is invalid
1482
- WriteHandle allocateChainedItemInternal (const ReadHandle & parent,
1482
+ WriteHandle allocateChainedItemInternal (const Item & parent,
1483
1483
uint32_t size);
1484
1484
1485
1485
// Given an item and its parentKey, validate that the parentKey
@@ -1609,7 +1609,7 @@ class CacheAllocator : public CacheBase {
1609
1609
// @param newParent the new parent for the chain
1610
1610
//
1611
1611
// @throw if any of the conditions for parent or newParent are not met.
1612
- void transferChainLocked (WriteHandle & parent, WriteHandle& newParent);
1612
+ void transferChainLocked (Item & parent, WriteHandle& newParent);
1613
1613
1614
1614
// replace a chained item in the existing chain. This needs to be called
1615
1615
// with the chained item lock held exclusive
@@ -1623,6 +1623,24 @@ class CacheAllocator : public CacheBase {
1623
1623
WriteHandle newItemHdl,
1624
1624
const Item& parent);
1625
1625
1626
+ //
1627
+ // Performs the actual inplace replace - it is called from
1628
+ // moveChainedItem and replaceChainedItemLocked
1629
+ // must hold chainedItemLock
1630
+ //
1631
+ // @param oldItem the item we are replacing in the chain
1632
+ // @param newItem the item we are replacing it with
1633
+ // @param parent the parent for the chain
1634
+ // @param fromMove used to determine if the replaced was called from
1635
+ // moveChainedItem - we avoid the handle destructor
1636
+ // in this case.
1637
+ //
1638
+ // @return handle to the oldItem
1639
+ void replaceInChainLocked (Item& oldItem,
1640
+ WriteHandle& newItemHdl,
1641
+ const Item& parent,
1642
+ bool fromMove);
1643
+
1626
1644
// Insert an item into MM container. The caller must hold a valid handle for
1627
1645
// the item.
1628
1646
//
@@ -1731,6 +1749,18 @@ class CacheAllocator : public CacheBase {
1731
1749
1732
1750
using EvictionIterator = typename MMContainer::LockedIterator;
1733
1751
1752
+ // Wakes up waiters if there are any
1753
+ //
1754
+ // @param item wakes waiters that are waiting on that item
1755
+ // @param handle handle to pass to the waiters
1756
+ void wakeUpWaiters (Item& item, WriteHandle handle);
1757
+
1758
+ // Unmarks item as moving and wakes up any waiters waiting on that item
1759
+ //
1760
+ // @param item wakes waiters that are waiting on that item
1761
+ // @param handle handle to pass to the waiters
1762
+ typename RefcountWithFlags::Value unmarkMovingAndWakeUpWaiters (Item &item, WriteHandle handle);
1763
+
1734
1764
// Deserializer CacheAllocatorMetadata and verify the version
1735
1765
//
1736
1766
// @param deserializer Deserializer object
@@ -2082,6 +2112,87 @@ class CacheAllocator : public CacheBase {
2082
2112
2083
2113
// BEGIN private members
2084
2114
2115
+ bool tryGetHandleWithWaitContextForMovingItem (Item& item, WriteHandle& handle);
2116
+
2117
+ size_t wakeUpWaitersLocked (folly::StringPiece key, WriteHandle&& handle);
2118
+
2119
+ class MoveCtx {
2120
+ public:
2121
+ MoveCtx () {}
2122
+
2123
+ ~MoveCtx () {
2124
+ // prevent any further enqueue to waiters
2125
+ // Note: we don't need to hold locks since no one can enqueue
2126
+ // after this point.
2127
+ wakeUpWaiters ();
2128
+ }
2129
+
2130
+ // record the item handle. Upon destruction we will wake up the waiters
2131
+ // and pass a clone of the handle to the callBack. By default we pass
2132
+ // a null handle
2133
+ void setItemHandle (WriteHandle _it) { it = std::move (_it); }
2134
+
2135
+ // enqueue a waiter into the waiter list
2136
+ // @param waiter WaitContext
2137
+ void addWaiter (std::shared_ptr<WaitContext<ReadHandle>> waiter) {
2138
+ XDCHECK (waiter);
2139
+ waiters.push_back (std::move (waiter));
2140
+ }
2141
+
2142
+ size_t numWaiters () const { return waiters.size (); }
2143
+
2144
+ private:
2145
+ // notify all pending waiters that are waiting for the fetch.
2146
+ void wakeUpWaiters () {
2147
+ bool refcountOverflowed = false ;
2148
+ for (auto & w : waiters) {
2149
+ // If refcount overflowed earlier, then we will return miss to
2150
+ // all subsequent waitors.
2151
+ if (refcountOverflowed) {
2152
+ w->set (WriteHandle{});
2153
+ continue ;
2154
+ }
2155
+
2156
+ try {
2157
+ w->set (it.clone ());
2158
+ } catch (const exception::RefcountOverflow&) {
2159
+ // We'll return a miss to the user's pending read,
2160
+ // so we should enqueue a delete via NvmCache.
2161
+ // TODO: cache.remove(it);
2162
+ refcountOverflowed = true ;
2163
+ }
2164
+ }
2165
+ }
2166
+
2167
+ WriteHandle it; // will be set when Context is being filled
2168
+ std::vector<std::shared_ptr<WaitContext<ReadHandle>>> waiters; // list of
2169
+ // waiters
2170
+ };
2171
+ using MoveMap =
2172
+ folly::F14ValueMap<folly::StringPiece,
2173
+ std::unique_ptr<MoveCtx>,
2174
+ folly::HeterogeneousAccessHash<folly::StringPiece>>;
2175
+
2176
+ static size_t getShardForKey (folly::StringPiece key) {
2177
+ return folly::Hash ()(key) % kShards ;
2178
+ }
2179
+
2180
+ MoveMap& getMoveMapForShard (size_t shard) {
2181
+ return movesMap_[shard].movesMap_ ;
2182
+ }
2183
+
2184
+ MoveMap& getMoveMap (folly::StringPiece key) {
2185
+ return getMoveMapForShard (getShardForKey (key));
2186
+ }
2187
+
2188
+ std::unique_lock<std::mutex> getMoveLockForShard (size_t shard) {
2189
+ return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_ );
2190
+ }
2191
+
2192
+ std::unique_lock<std::mutex> getMoveLock (folly::StringPiece key) {
2193
+ return getMoveLockForShard (getShardForKey (key));
2194
+ }
2195
+
2085
2196
// Whether the memory allocator for this cache allocator was created on shared
2086
2197
// memory. The hash table, chained item hash table etc is also created on
2087
2198
// shared memory except for temporary shared memory mode when they're created
@@ -2175,6 +2286,22 @@ class CacheAllocator : public CacheBase {
2175
2286
// poolResizer_, poolOptimizer_, memMonitor_, reaper_
2176
2287
mutable std::mutex workersMutex_;
2177
2288
2289
+ static constexpr size_t kShards = 8192 ; // TODO: need to define right value
2290
+
2291
+ struct MovesMapShard {
2292
+ alignas (folly::hardware_destructive_interference_size) MoveMap movesMap_;
2293
+ };
2294
+
2295
+ struct MoveLock {
2296
+ alignas (folly::hardware_destructive_interference_size) std::mutex moveLock_;
2297
+ };
2298
+
2299
+ // a map of all pending moves
2300
+ std::vector<MovesMapShard> movesMap_;
2301
+
2302
+ // a map of move locks for each shard
2303
+ std::vector<MoveLock> moveLock_;
2304
+
2178
2305
// time when the ram cache was first created
2179
2306
const uint32_t cacheCreationTime_{0 };
2180
2307
0 commit comments