@@ -1349,7 +1349,7 @@ class CacheAllocator : public CacheBase {
1349
1349
1350
1350
private:
1351
1351
// wrapper around Item's refcount and active handle tracking
1352
- FOLLY_ALWAYS_INLINE bool incRef (Item& it);
1352
+ FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef (Item& it);
1353
1353
FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef (Item& it);
1354
1354
1355
1355
// drops the refcount and if needed, frees the allocation back to the memory
@@ -1473,13 +1473,13 @@ class CacheAllocator : public CacheBase {
1473
1473
// The parent handle parameter here is mainly used to find the
1474
1474
// correct pool to allocate memory for this chained item
1475
1475
//
1476
- // @param parent handle to the cache item
1476
+ // @param parent the parent item
1477
1477
// @param size the size for the chained allocation
1478
1478
//
1479
1479
// @return handle to the chained allocation
1480
1480
// @throw std::invalid_argument if the size requested is invalid or
1481
1481
// if the item is invalid
1482
- WriteHandle allocateChainedItemInternal (const ReadHandle & parent,
1482
+ WriteHandle allocateChainedItemInternal (const Item & parent,
1483
1483
uint32_t size);
1484
1484
1485
1485
// Given an item and its parentKey, validate that the parentKey
@@ -1609,7 +1609,7 @@ class CacheAllocator : public CacheBase {
1609
1609
// @param newParent the new parent for the chain
1610
1610
//
1611
1611
// @throw if any of the conditions for parent or newParent are not met.
1612
- void transferChainLocked (WriteHandle & parent, WriteHandle& newParent);
1612
+ void transferChainLocked (Item & parent, WriteHandle& newParent);
1613
1613
1614
1614
// replace a chained item in the existing chain. This needs to be called
1615
1615
// with the chained item lock held exclusive
@@ -1623,6 +1623,24 @@ class CacheAllocator : public CacheBase {
1623
1623
WriteHandle newItemHdl,
1624
1624
const Item& parent);
1625
1625
1626
+ //
1627
+ // Performs the actual inplace replace - it is called from
1628
+ // moveChainedItem and replaceChainedItemLocked
1629
+ // must hold chainedItemLock
1630
+ //
1631
+ // @param oldItem the item we are replacing in the chain
1632
+ // @param newItem the item we are replacing it with
1633
+ // @param parent the parent for the chain
1634
+ // @param fromMove used to determine if the replaced was called from
1635
+ // moveChainedItem - we avoid the handle destructor
1636
+ // in this case.
1637
+ //
1638
+ // @return handle to the oldItem
1639
+ void replaceInChainLocked (Item& oldItem,
1640
+ WriteHandle& newItemHdl,
1641
+ const Item& parent,
1642
+ bool fromMove);
1643
+
1626
1644
// Insert an item into MM container. The caller must hold a valid handle for
1627
1645
// the item.
1628
1646
//
@@ -1731,6 +1749,18 @@ class CacheAllocator : public CacheBase {
1731
1749
1732
1750
using EvictionIterator = typename MMContainer::LockedIterator;
1733
1751
1752
+ // Wakes up waiters if there are any
1753
+ //
1754
+ // @param item wakes waiters that are waiting on that item
1755
+ // @param handle handle to pass to the waiters
1756
+ void wakeUpWaiters (Item& item, WriteHandle handle);
1757
+
1758
+ // Unmarks item as moving and wakes up any waiters waiting on that item
1759
+ //
1760
+ // @param item wakes waiters that are waiting on that item
1761
+ // @param handle handle to pass to the waiters
1762
+ typename RefcountWithFlags::Value unmarkMovingAndWakeUpWaiters (Item &item, WriteHandle handle);
1763
+
1734
1764
// Deserializer CacheAllocatorMetadata and verify the version
1735
1765
//
1736
1766
// @param deserializer Deserializer object
@@ -1844,6 +1874,11 @@ class CacheAllocator : public CacheBase {
1844
1874
Item& item,
1845
1875
util::Throttler& throttler);
1846
1876
1877
+ // Helper function to create PutToken
1878
+ //
1879
+ // @return valid token if the item should be written to NVM cache.
1880
+ typename NvmCacheT::PutToken createPutToken (Item& item);
1881
+
1847
1882
// Helper function to evict a normal item for slab release
1848
1883
//
1849
1884
// @return last handle for corresponding to item on success. empty handle on
@@ -2082,6 +2117,87 @@ class CacheAllocator : public CacheBase {
2082
2117
2083
2118
// BEGIN private members
2084
2119
2120
+ bool tryGetHandleWithWaitContextForMovingItem (Item& item, WriteHandle& handle);
2121
+
2122
+ size_t wakeUpWaitersLocked (folly::StringPiece key, WriteHandle&& handle);
2123
+
2124
+ class MoveCtx {
2125
+ public:
2126
+ MoveCtx () {}
2127
+
2128
+ ~MoveCtx () {
2129
+ // prevent any further enqueue to waiters
2130
+ // Note: we don't need to hold locks since no one can enqueue
2131
+ // after this point.
2132
+ wakeUpWaiters ();
2133
+ }
2134
+
2135
+ // record the item handle. Upon destruction we will wake up the waiters
2136
+ // and pass a clone of the handle to the callBack. By default we pass
2137
+ // a null handle
2138
+ void setItemHandle (WriteHandle _it) { it = std::move (_it); }
2139
+
2140
+ // enqueue a waiter into the waiter list
2141
+ // @param waiter WaitContext
2142
+ void addWaiter (std::shared_ptr<WaitContext<ReadHandle>> waiter) {
2143
+ XDCHECK (waiter);
2144
+ waiters.push_back (std::move (waiter));
2145
+ }
2146
+
2147
+ size_t numWaiters () const { return waiters.size (); }
2148
+
2149
+ private:
2150
+ // notify all pending waiters that are waiting for the fetch.
2151
+ void wakeUpWaiters () {
2152
+ bool refcountOverflowed = false ;
2153
+ for (auto & w : waiters) {
2154
+ // If refcount overflowed earlier, then we will return miss to
2155
+ // all subsequent waiters.
2156
+ if (refcountOverflowed) {
2157
+ w->set (WriteHandle{});
2158
+ continue ;
2159
+ }
2160
+
2161
+ try {
2162
+ w->set (it.clone ());
2163
+ } catch (const exception ::RefcountOverflow&) {
2164
+ // We'll return a miss to the user's pending read,
2165
+ // so we should enqueue a delete via NvmCache.
2166
+ // TODO: cache.remove(it);
2167
+ refcountOverflowed = true ;
2168
+ }
2169
+ }
2170
+ }
2171
+
2172
+ WriteHandle it; // will be set when Context is being filled
2173
+ std::vector<std::shared_ptr<WaitContext<ReadHandle>>> waiters; // list of
2174
+ // waiters
2175
+ };
2176
+ using MoveMap =
2177
+ folly::F14ValueMap<folly::StringPiece,
2178
+ std::unique_ptr<MoveCtx>,
2179
+ folly::HeterogeneousAccessHash<folly::StringPiece>>;
2180
+
2181
+ static size_t getShardForKey (folly::StringPiece key) {
2182
+ return folly::Hash ()(key) % kShards ;
2183
+ }
2184
+
2185
+ MoveMap& getMoveMapForShard (size_t shard) {
2186
+ return movesMap_[shard].movesMap_ ;
2187
+ }
2188
+
2189
+ MoveMap& getMoveMap (folly::StringPiece key) {
2190
+ return getMoveMapForShard (getShardForKey (key));
2191
+ }
2192
+
2193
+ std::unique_lock<std::mutex> getMoveLockForShard (size_t shard) {
2194
+ return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_ );
2195
+ }
2196
+
2197
+ std::unique_lock<std::mutex> getMoveLock (folly::StringPiece key) {
2198
+ return getMoveLockForShard (getShardForKey (key));
2199
+ }
2200
+
2085
2201
// Whether the memory allocator for this cache allocator was created on shared
2086
2202
// memory. The hash table, chained item hash table etc is also created on
2087
2203
// shared memory except for temporary shared memory mode when they're created
@@ -2175,6 +2291,22 @@ class CacheAllocator : public CacheBase {
2175
2291
// poolResizer_, poolOptimizer_, memMonitor_, reaper_
2176
2292
mutable std::mutex workersMutex_;
2177
2293
2294
+ static constexpr size_t kShards = 8192 ; // TODO: need to define right value
2295
+
2296
+ struct MovesMapShard {
2297
+ alignas (folly::hardware_destructive_interference_size) MoveMap movesMap_;
2298
+ };
2299
+
2300
+ struct MoveLock {
2301
+ alignas (folly::hardware_destructive_interference_size) std::mutex moveLock_;
2302
+ };
2303
+
2304
+ // a map of all pending moves
2305
+ std::vector<MovesMapShard> movesMap_;
2306
+
2307
+ // a map of move locks for each shard
2308
+ std::vector<MoveLock> moveLock_;
2309
+
2178
2310
// time when the ram cache was first created
2179
2311
const uint32_t cacheCreationTime_{0 };
2180
2312
0 commit comments