From 96ed3ebe9260eed0e71b0811807e9a2c0efa509d Mon Sep 17 00:00:00 2001 From: Ben Huddleston Date: Tue, 13 Aug 2019 13:48:29 +0100 Subject: [PATCH] MB-34017: Optimize warmup - Only warmup prepares from HCS to HPS For Durability, we have introduced a new LoadPrepare phase at Warmup. That is necessary for loading pending Prepares from disk and inserting them into memory structures (ie, HashTable, CheckpointManager, DurabilityMonitor) for leading them to completion. Given that we need to re-process only Prepares that have not been completed (ie, Committed or Aborted), then we can safely start the LoadPrepare scan from the HCS (excluded) onward. That's because by definition every Prepare before or at HCS has been completed. After introducing the LoadPrepare phase (and before this change) we have seen an increase of 100% on the total Warmup runtime. That is because the first implementation of the LoadPrepare phase starts the scan at seqno=0. Thus, the full Warmup performs two full scans of the entire seqno-index. This patch addresses the issue. We also do not load any prepares when HCS == HPS as every prepare has been completed. Change-Id: Iaf310fe5d7f508303d05d1f5a9632b9dfcf368a7 Reviewed-on: http://review.couchbase.org/113267 Reviewed-by: James Harrison Reviewed-by: Dave Rigby Tested-by: Build Bot --- engines/ep/src/ep_bucket.cc | 80 ++++++++++++++----- engines/ep/src/ep_bucket.h | 4 +- engines/ep/src/ephemeral_bucket.h | 7 +- engines/ep/src/kv_bucket_iface.h | 12 ++- engines/ep/src/stats.cc | 2 + engines/ep/src/stats.h | 4 + engines/ep/src/warmup.cc | 8 +- .../module_tests/evp_store_warmup_test.cc | 34 ++++++++ 8 files changed, 125 insertions(+), 26 deletions(-) diff --git a/engines/ep/src/ep_bucket.cc b/engines/ep/src/ep_bucket.cc index 7da13ad042..eb81f6819b 100644 --- a/engines/ep/src/ep_bucket.cc +++ b/engines/ep/src/ep_bucket.cc @@ -1366,14 +1366,28 @@ void EPBucket::rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) { // At the end of the scan, all outstanding Prepared items (which did not // have a Commit persisted to disk) will be registered with the Durability // Monitor. -void EPBucket::loadPreparedSyncWrites( +EPBucket::LoadPreparedSyncWritesResult EPBucket::loadPreparedSyncWrites( folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) { /// Disk load callback for scan. struct LoadSyncWrites : public StatusCallback { - LoadSyncWrites(EPVBucket& vb) : vb(vb) { + LoadSyncWrites(EPVBucket& vb, uint64_t highPreparedSeqno) + : vb(vb), highPreparedSeqno(highPreparedSeqno) { } void callback(GetValue& val) override { + // Abort the scan early if we have passed the HPS as we don't need + // to load any more prepares. + if (val.item->getBySeqno() > + static_cast(highPreparedSeqno)) { + // ENOMEM may seem like an odd status code to abort the scan but + // disk backfill to a given seqno also returns ENGINE_ENOMEM + // when it has received all the seqnos that it cares about to + // abort the scan. + setStatus(ENGINE_ENOMEM); + return; + } + + itemsVisited++; if (val.item->isPending()) { // Pending item which was not aborted (deleted). Add to // outstanding Prepare map. @@ -1392,6 +1406,13 @@ void EPBucket::loadPreparedSyncWrites( EPVBucket& vb; + // HPS after which we can abort the scan + uint64_t highPreparedSeqno = std::numeric_limits::max(); + + // Number of items our callback "visits". Used to validate how many + // items we look at when loading SyncWrites. + uint64_t itemsVisited = 0; + /// Map of Document key -> outstanding (not yet Committed / Aborted) /// prepares. std::unordered_map> @@ -1401,18 +1422,39 @@ void EPBucket::loadPreparedSyncWrites( auto& epVb = dynamic_cast(vb); const auto start = std::chrono::steady_clock::now(); - // @TODO MB-34017: We can optimise this by starting the scan at the - // high_committed_seqno - all earlier prepares would have been committed - // (or were aborted) and only scanning up to the high prepared seqno. - uint64_t startSeqno = 0; - // Get the kvStore. Using the RW store as the rollback code that will call // this function will modify vbucket_state that will only be reflected in // RW store. For warmup case, we don't allow writes at this point in time // anyway. auto* kvStore = getRWUnderlyingByShard(epVb.getShard()->getId()); - auto storageCB = std::make_shared(epVb); + // Need the HPS/HCS so the DurabilityMonitor can be fully resumed + auto vbState = kvStore->getVBucketState(epVb.getId()); + if (!vbState) { + throw std::logic_error("EPBucket::loadPreparedSyncWrites: processing " + + epVb.getId().to_string() + + ", but found no vbucket_state"); + } + + // Insert all outstanding Prepares into the VBucket (HashTable & + // DurabilityMonitor). + std::vector prepares; + if (vbState->highPreparedSeqno == vbState->highCompletedSeqno) { + // We don't need to warm up anything for this vBucket as all of our + // prepares have been completed, but we do need to create the DM + // with our vbucket_state. + epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares)); + // No prepares loaded + return {0, 0}; + } + + // We optimise this step by starting the scan at the seqno following the + // High Completed Seqno. By definition, all earlier prepares have been + // completed (Committed or Aborted). + const uint64_t startSeqno = vbState->highCompletedSeqno + 1; + + auto storageCB = + std::make_shared(epVb, vbState->highPreparedSeqno); // Don't expect to find anything already in the HashTable, so use // NoLookupCallback. @@ -1434,11 +1476,17 @@ void EPBucket::loadPreparedSyncWrites( EP_LOG_CRITICAL( "EPBucket::loadPreparedSyncWrites: scanCtx is null for {}", epVb.getId()); - return; + // No prepares loaded + return {0, 0}; } auto scanResult = kvStore->scan(scanCtx); - Expects(scanResult == scan_success); + + // If we abort our scan early due to reaching the HPS then the scan result + // will be failure but we will have scanned correctly. + if (storageCB->getStatus() != ENGINE_ENOMEM) { + Expects(scanResult == scan_success); + } kvStore->destroyScanContext(scanCtx); @@ -1451,7 +1499,7 @@ void EPBucket::loadPreparedSyncWrites( // Insert all outstanding Prepares into the VBucket (HashTable & // DurabilityMonitor). - std::vector prepares; + prepares.reserve(storageCB->outstandingPrepares.size()); for (auto& prepare : storageCB->outstandingPrepares) { prepares.emplace_back(std::move(prepare.second)); } @@ -1461,15 +1509,9 @@ void EPBucket::loadPreparedSyncWrites( return a->getBySeqno() < b->getBySeqno(); }); - // Need the HPS/HCS so the DurabilityMonitor can be fully resumed - auto vbState = kvStore->getVBucketState(epVb.getId()); - if (!vbState) { - throw std::logic_error("EPBucket::loadPreparedSyncWrites: processing " + - epVb.getId().to_string() + - ", but found no vbucket_state"); - } - + auto numPrepares = prepares.size(); epVb.loadOutstandingPrepares(vbStateLh, *vbState, std::move(prepares)); + return {storageCB->itemsVisited, numPrepares}; } ValueFilter EPBucket::getValueFilterForCompressionMode() { diff --git a/engines/ep/src/ep_bucket.h b/engines/ep/src/ep_bucket.h index ccca5db760..32e574a59d 100644 --- a/engines/ep/src/ep_bucket.h +++ b/engines/ep/src/ep_bucket.h @@ -137,8 +137,8 @@ class EPBucket : public KVBucket { void rollbackUnpersistedItems(VBucket& vb, int64_t rollbackSeqno) override; - void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh, - VBucket& vb) override; + LoadPreparedSyncWritesResult loadPreparedSyncWrites( + folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) override; /** * Returns the ValueFilter to use for KVStore scans, given the bucket diff --git a/engines/ep/src/ephemeral_bucket.h b/engines/ep/src/ephemeral_bucket.h index be8819d8d2..ff82ac20d9 100644 --- a/engines/ep/src/ephemeral_bucket.h +++ b/engines/ep/src/ephemeral_bucket.h @@ -104,9 +104,10 @@ class EphemeralBucket : public KVBucket { // No op } - void loadPreparedSyncWrites(folly::SharedMutex::WriteHolder& vbStateLh, - VBucket& vb) override { - // No op + LoadPreparedSyncWritesResult loadPreparedSyncWrites( + folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) override { + // No op, return 0 prepares loaded + return {0, 0}; } void notifyNewSeqno(const Vbid vbid, const VBNotifyCtx& notifyCtx) override; diff --git a/engines/ep/src/kv_bucket_iface.h b/engines/ep/src/kv_bucket_iface.h index 63e312f6f5..f2ea9afa0e 100644 --- a/engines/ep/src/kv_bucket_iface.h +++ b/engines/ep/src/kv_bucket_iface.h @@ -787,6 +787,14 @@ class KVBucketIface { */ virtual bool isGetAllKeysSupported() const = 0; + /** + * Result of the loadPreparedSyncWrites function + */ + struct LoadPreparedSyncWritesResult { + uint64_t itemsVisited = 0; + uint64_t preparesLoaded = 0; + }; + protected: /** @@ -834,8 +842,10 @@ class KVBucketIface { * * @param vbStateLh vBucket state lock * @param vb vBucket for which we will load SyncWrites + * + * @returns number of prepares loaded */ - virtual void loadPreparedSyncWrites( + virtual LoadPreparedSyncWritesResult loadPreparedSyncWrites( folly::SharedMutex::WriteHolder& vbStateLh, VBucket& vb) = 0; // During the warmup phase we might want to enable external traffic diff --git a/engines/ep/src/stats.cc b/engines/ep/src/stats.cc index 55e2a7f681..8d4dfaed1a 100644 --- a/engines/ep/src/stats.cc +++ b/engines/ep/src/stats.cc @@ -27,6 +27,8 @@ EPStats::EPStats() : warmedUpKeys(0), warmedUpValues(0), + warmedUpPrepares(0), + warmupItemsVisitedWhilstLoadingPrepares(0), warmDups(0), warmOOM(0), warmupMemUsedCap(0), diff --git a/engines/ep/src/stats.h b/engines/ep/src/stats.h index 4eda9ee040..fb1bc7290b 100644 --- a/engines/ep/src/stats.h +++ b/engines/ep/src/stats.h @@ -137,6 +137,10 @@ class EPStats { Counter warmedUpKeys; //! Number of key-values warmed up during data loading. Counter warmedUpValues; + //! Number of prepares warmed up. + Counter warmedUpPrepares; + //! Number of items visited whilst loading prepares + Counter warmupItemsVisitedWhilstLoadingPrepares; //! Number of warmup failures due to duplicates Counter warmDups; //! Number of OOM failures at warmup time. diff --git a/engines/ep/src/warmup.cc b/engines/ep/src/warmup.cc index 1ac0792859..adbe0c71d4 100644 --- a/engines/ep/src/warmup.cc +++ b/engines/ep/src/warmup.cc @@ -1223,7 +1223,13 @@ void Warmup::loadPreparedSyncWrites(uint16_t shardId) { // for rollback. auto& vb = *(itr->second); folly::SharedMutex::WriteHolder vbStateLh(vb.getStateLock()); - store.loadPreparedSyncWrites(vbStateLh, vb); + + auto result = store.loadPreparedSyncWrites(vbStateLh, vb); + store.getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares += result.itemsVisited; + store.getEPEngine().getEpStats().warmedUpPrepares += + result.preparesLoaded; } if (++threadtask_count == store.vbMap.getNumShards()) { diff --git a/engines/ep/tests/module_tests/evp_store_warmup_test.cc b/engines/ep/tests/module_tests/evp_store_warmup_test.cc index c471bbc2cf..186c7e1a22 100644 --- a/engines/ep/tests/module_tests/evp_store_warmup_test.cc +++ b/engines/ep/tests/module_tests/evp_store_warmup_test.cc @@ -655,6 +655,13 @@ void DurabilityWarmupTest::testPendingSyncWrite( // DurabilityMonitor be tracking the prepare. EXPECT_EQ(++numTracked, vb->getDurabilityMonitor().getNumTracked()); + + EXPECT_EQ(numTracked, + store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(numTracked, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); } } @@ -731,6 +738,13 @@ void DurabilityWarmupTest::testCommittedSyncWrite( // DurabilityMonitor should be empty as no outstanding prepares. EXPECT_EQ(--numTracked, vb->getDurabilityMonitor().getNumTracked()); + + EXPECT_EQ(numTracked, + store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(numTracked, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); } } @@ -776,6 +790,11 @@ void DurabilityWarmupTest::testCommittedAndPendingSyncWrite( setVBucketStateAndRunPersistTask(vbid, vbState); } resetEngineAndWarmup(); + EXPECT_EQ(1, store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(2, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); // Should load two items into memory - both committed and the pending value. // Check the original committed value is inaccessible due to the pending @@ -859,6 +878,11 @@ TEST_P(DurabilityWarmupTest, AbortedSyncWritePrepareIsNotLoaded) { EXPECT_EQ(1, vb->getNumItems()); } resetEngineAndWarmup(); + EXPECT_EQ(0, store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(0, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); // Should load one item into memory - committed value. auto vb = engine->getVBucket(vbid); @@ -898,6 +922,11 @@ TEST_P(DurabilityWarmupTest, ReplicationTopologyMissing) { vbid, vbstate, VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT); resetEngineAndWarmup(); + EXPECT_EQ(0, store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(0, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); // Check topology is empty. auto vb = engine->getKVBucket()->getVBucket(vbid); @@ -943,6 +972,11 @@ TEST_P(DurabilityWarmupTest, WarmupCommit) { // Because we bypassed KVBucket::set the HPS/HCS will be incorrect and fail // the pre/post warmup checker, so disable the checker for this test. resetEngineAndWarmup().disable(); + EXPECT_EQ(1, store->getEPEngine().getEpStats().warmedUpPrepares); + EXPECT_EQ(1, + store->getEPEngine() + .getEpStats() + .warmupItemsVisitedWhilstLoadingPrepares); vb = store->getVBucket(vbid); ASSERT_TRUE(vb);