@@ -1349,7 +1349,7 @@ class CacheAllocator : public CacheBase {
1349
1349
1350
1350
private:
1351
1351
// wrapper around Item's refcount and active handle tracking
1352
- FOLLY_ALWAYS_INLINE bool incRef (Item& it);
1352
+ FOLLY_ALWAYS_INLINE RefcountWithFlags::incResult incRef (Item& it);
1353
1353
FOLLY_ALWAYS_INLINE RefcountWithFlags::Value decRef (Item& it);
1354
1354
1355
1355
// drops the refcount and if needed, frees the allocation back to the memory
@@ -1473,26 +1473,13 @@ class CacheAllocator : public CacheBase {
1473
1473
// The parent handle parameter here is mainly used to find the
1474
1474
// correct pool to allocate memory for this chained item
1475
1475
//
1476
- // @param parent handle to the cache item
1476
+ // @param parent the parent item
1477
1477
// @param size the size for the chained allocation
1478
1478
//
1479
1479
// @return handle to the chained allocation
1480
1480
// @throw std::invalid_argument if the size requested is invalid or
1481
1481
// if the item is invalid
1482
- WriteHandle allocateChainedItemInternal (const ReadHandle& parent,
1483
- uint32_t size);
1484
-
1485
- // Given an item and its parentKey, validate that the parentKey
1486
- // corresponds to an item that's the parent of the supplied item.
1487
- //
1488
- // @param item item that we want to get the parent handle for
1489
- // @param parentKey key of the item's parent
1490
- //
1491
- // @return handle to the parent item if the validations pass
1492
- // otherwise, an empty Handle is returned.
1493
- //
1494
- ReadHandle validateAndGetParentHandleForChainedMoveLocked (
1495
- const ChainedItem& item, const Key& parentKey);
1482
+ WriteHandle allocateChainedItemInternal (const Item& parent, uint32_t size);
1496
1483
1497
1484
// Given an existing item, allocate a new one for the
1498
1485
// existing one to later be moved into.
@@ -1609,7 +1596,7 @@ class CacheAllocator : public CacheBase {
1609
1596
// @param newParent the new parent for the chain
1610
1597
//
1611
1598
// @throw if any of the conditions for parent or newParent are not met.
1612
- void transferChainLocked (WriteHandle & parent, WriteHandle& newParent);
1599
+ void transferChainLocked (Item & parent, WriteHandle& newParent);
1613
1600
1614
1601
// replace a chained item in the existing chain. This needs to be called
1615
1602
// with the chained item lock held exclusive
@@ -1623,6 +1610,24 @@ class CacheAllocator : public CacheBase {
1623
1610
WriteHandle newItemHdl,
1624
1611
const Item& parent);
1625
1612
1613
+ //
1614
+ // Performs the actual inplace replace - it is called from
1615
+ // moveChainedItem and replaceChainedItemLocked
1616
+ // must hold chainedItemLock
1617
+ //
1618
+ // @param oldItem the item we are replacing in the chain
1619
+ // @param newItem the item we are replacing it with
1620
+ // @param parent the parent for the chain
1621
+ // @param fromMove used to determine if the replaced was called from
1622
+ // moveChainedItem - we avoid the handle destructor
1623
+ // in this case.
1624
+ //
1625
+ // @return handle to the oldItem
1626
+ void replaceInChainLocked (Item& oldItem,
1627
+ WriteHandle& newItemHdl,
1628
+ const Item& parent,
1629
+ bool fromMove);
1630
+
1626
1631
// Insert an item into MM container. The caller must hold a valid handle for
1627
1632
// the item.
1628
1633
//
@@ -1731,6 +1736,19 @@ class CacheAllocator : public CacheBase {
1731
1736
1732
1737
using EvictionIterator = typename MMContainer::LockedIterator;
1733
1738
1739
+ // Wakes up waiters if there are any
1740
+ //
1741
+ // @param item wakes waiters that are waiting on that item
1742
+ // @param handle handle to pass to the waiters
1743
+ void wakeUpWaiters (Item& item, WriteHandle handle);
1744
+
1745
+ // Unmarks item as moving and wakes up any waiters waiting on that item
1746
+ //
1747
+ // @param item wakes waiters that are waiting on that item
1748
+ // @param handle handle to pass to the waiters
1749
+ typename RefcountWithFlags::Value unmarkMovingAndWakeUpWaiters (
1750
+ Item& item, WriteHandle handle);
1751
+
1734
1752
// Deserializer CacheAllocatorMetadata and verify the version
1735
1753
//
1736
1754
// @param deserializer Deserializer object
@@ -1844,6 +1862,11 @@ class CacheAllocator : public CacheBase {
1844
1862
Item& item,
1845
1863
util::Throttler& throttler);
1846
1864
1865
+ // Helper function to create PutToken
1866
+ //
1867
+ // @return valid token if the item should be written to NVM cache.
1868
+ typename NvmCacheT::PutToken createPutToken (Item& item);
1869
+
1847
1870
// Helper function to evict a normal item for slab release
1848
1871
//
1849
1872
// @return last handle for corresponding to item on success. empty handle on
@@ -2082,6 +2105,88 @@ class CacheAllocator : public CacheBase {
2082
2105
2083
2106
// BEGIN private members
2084
2107
2108
+ bool tryGetHandleWithWaitContextForMovingItem (Item& item,
2109
+ WriteHandle& handle);
2110
+
2111
+ size_t wakeUpWaitersLocked (folly::StringPiece key, WriteHandle&& handle);
2112
+
2113
+ class MoveCtx {
2114
+ public:
2115
+ MoveCtx () {}
2116
+
2117
+ ~MoveCtx () {
2118
+ // prevent any further enqueue to waiters
2119
+ // Note: we don't need to hold locks since no one can enqueue
2120
+ // after this point.
2121
+ wakeUpWaiters ();
2122
+ }
2123
+
2124
+ // record the item handle. Upon destruction we will wake up the waiters
2125
+ // and pass a clone of the handle to the callBack. By default we pass
2126
+ // a null handle
2127
+ void setItemHandle (WriteHandle _it) { it = std::move (_it); }
2128
+
2129
+ // enqueue a waiter into the waiter list
2130
+ // @param waiter WaitContext
2131
+ void addWaiter (std::shared_ptr<WaitContext<ReadHandle>> waiter) {
2132
+ XDCHECK (waiter);
2133
+ waiters.push_back (std::move (waiter));
2134
+ }
2135
+
2136
+ size_t numWaiters () const { return waiters.size (); }
2137
+
2138
+ private:
2139
+ // notify all pending waiters that are waiting for the fetch.
2140
+ void wakeUpWaiters () {
2141
+ bool refcountOverflowed = false ;
2142
+ for (auto & w : waiters) {
2143
+ // If refcount overflowed earlier, then we will return miss to
2144
+ // all subsequent waiters.
2145
+ if (refcountOverflowed) {
2146
+ w->set (WriteHandle{});
2147
+ continue ;
2148
+ }
2149
+
2150
+ try {
2151
+ w->set (it.clone ());
2152
+ } catch (const exception ::RefcountOverflow&) {
2153
+ // We'll return a miss to the user's pending read,
2154
+ // so we should enqueue a delete via NvmCache.
2155
+ // TODO: cache.remove(it);
2156
+ refcountOverflowed = true ;
2157
+ }
2158
+ }
2159
+ }
2160
+
2161
+ WriteHandle it; // will be set when Context is being filled
2162
+ std::vector<std::shared_ptr<WaitContext<ReadHandle>>> waiters; // list of
2163
+ // waiters
2164
+ };
2165
+ using MoveMap =
2166
+ folly::F14ValueMap<folly::StringPiece,
2167
+ std::unique_ptr<MoveCtx>,
2168
+ folly::HeterogeneousAccessHash<folly::StringPiece>>;
2169
+
2170
+ static size_t getShardForKey (folly::StringPiece key) {
2171
+ return folly::Hash ()(key) % kShards ;
2172
+ }
2173
+
2174
+ MoveMap& getMoveMapForShard (size_t shard) {
2175
+ return movesMap_[shard].movesMap_ ;
2176
+ }
2177
+
2178
+ MoveMap& getMoveMap (folly::StringPiece key) {
2179
+ return getMoveMapForShard (getShardForKey (key));
2180
+ }
2181
+
2182
+ std::unique_lock<std::mutex> getMoveLockForShard (size_t shard) {
2183
+ return std::unique_lock<std::mutex>(moveLock_[shard].moveLock_ );
2184
+ }
2185
+
2186
+ std::unique_lock<std::mutex> getMoveLock (folly::StringPiece key) {
2187
+ return getMoveLockForShard (getShardForKey (key));
2188
+ }
2189
+
2085
2190
// Whether the memory allocator for this cache allocator was created on shared
2086
2191
// memory. The hash table, chained item hash table etc is also created on
2087
2192
// shared memory except for temporary shared memory mode when they're created
@@ -2175,6 +2280,22 @@ class CacheAllocator : public CacheBase {
2175
2280
// poolResizer_, poolOptimizer_, memMonitor_, reaper_
2176
2281
mutable std::mutex workersMutex_;
2177
2282
2283
+ static constexpr size_t kShards = 8192 ; // TODO: need to define right value
2284
+
2285
+ struct MovesMapShard {
2286
+ alignas (folly::hardware_destructive_interference_size) MoveMap movesMap_;
2287
+ };
2288
+
2289
+ struct MoveLock {
2290
+ alignas (folly::hardware_destructive_interference_size) std::mutex moveLock_;
2291
+ };
2292
+
2293
+ // a map of all pending moves
2294
+ std::vector<MovesMapShard> movesMap_;
2295
+
2296
+ // a map of move locks for each shard
2297
+ std::vector<MoveLock> moveLock_;
2298
+
2178
2299
// time when the ram cache was first created
2179
2300
const uint32_t cacheCreationTime_{0 };
2180
2301
0 commit comments