From 9d49e46c3b43e0a9158f18c5eb8f7b2bfc37fb10 Mon Sep 17 00:00:00 2001 From: Daniel Byrne Date: Mon, 5 Feb 2024 17:18:36 -0800 Subject: [PATCH] Binary KVReplayGenerator ------------------------ This offers much lower overhead of trace replaying. It assumes the kvcache trace format and kvcache behavoir. This patch supports the following: - binary request generation and replay - fast forwarding of a trace - preloading requests into memory - object size amplification - queue free for even lower request overhead - can parse many more requests per second than cachelib can process, so we can get 100% CPU usage The limitations are: - no trace amplification (however you can amplify the original .csv trace and save it in binary format) - ~4GB overhead per 100 million requests - you need some disk space to store large traces --- cachelib/cachebench/cache/Cache.h | 4 +- .../cachebench/runner/AsyncCacheStressor.h | 43 ++-- cachelib/cachebench/runner/CacheStressor.h | 40 ++-- cachelib/cachebench/runner/Stressor.cpp | 3 + cachelib/cachebench/util/Config.cpp | 8 +- cachelib/cachebench/util/Config.h | 13 ++ cachelib/cachebench/util/Request.h | 51 ++++- .../workload/BinaryKVReplayGenerator.h | 200 ++++++++++++++++++ .../cachebench/workload/KVReplayGenerator.h | 174 +++++++++++++-- .../cachebench/workload/OnlineGenerator.cpp | 4 +- .../workload/PieceWiseReplayGenerator.h | 5 +- .../cachebench/workload/ReplayGeneratorBase.h | 190 ++++++++++++++++- 12 files changed, 665 insertions(+), 70 deletions(-) create mode 100644 cachelib/cachebench/workload/BinaryKVReplayGenerator.h diff --git a/cachelib/cachebench/cache/Cache.h b/cachelib/cachebench/cache/Cache.h index 299e384da7..218e122d59 100644 --- a/cachelib/cachebench/cache/Cache.h +++ b/cachelib/cachebench/cache/Cache.h @@ -314,8 +314,8 @@ class Cache { // return true if the key was previously detected to be inconsistent. This // is useful only when consistency checking is enabled by calling // enableConsistencyCheck() - bool isInvalidKey(const std::string& key) { - return invalidKeys_[key].load(std::memory_order_relaxed); + bool isInvalidKey(const std::string_view key) { + return invalidKeys_[std::string(key)].load(std::memory_order_relaxed); } // Get overall stats on the whole cache allocator diff --git a/cachelib/cachebench/runner/AsyncCacheStressor.h b/cachelib/cachebench/runner/AsyncCacheStressor.h index 529a124b4f..f4f3dbf472 100644 --- a/cachelib/cachebench/runner/AsyncCacheStressor.h +++ b/cachelib/cachebench/runner/AsyncCacheStressor.h @@ -222,9 +222,9 @@ class AsyncCacheStressor : public Stressor { ThroughputStats& stats, const Request* req, folly::EventBase* evb, - const std::string* key) { + const std::string_view key) { ++stats.get; - auto lock = chainedItemAcquireSharedLock(*key); + auto lock = chainedItemAcquireSharedLock(key); if (ticker_) { ticker_->updateTimeStamp(req->timestamp); @@ -233,8 +233,7 @@ class AsyncCacheStressor : public Stressor { // add a distribution over sequences of requests/access patterns // e.g. get-no-set and set-no-get - auto onReadyFn = [&, req, key = *key, - l = std::move(lock)](auto hdl) mutable { + auto onReadyFn = [&, req, key, l = std::move(lock)](auto hdl) mutable { auto result = OpResultType::kGetMiss; if (hdl == nullptr) { @@ -247,7 +246,7 @@ class AsyncCacheStressor : public Stressor { // appropriate here) l.unlock(); auto xlock = chainedItemAcquireUniqueLock(key); - setKey(pid, stats, &key, *(req->sizeBegin), req->ttlSecs, + setKey(pid, stats, key, *(req->sizeBegin), req->ttlSecs, req->admFeatureMap); } } else { @@ -260,8 +259,8 @@ class AsyncCacheStressor : public Stressor { } }; - cache_->recordAccess(*key); - auto sf = cache_->asyncFind(*key); + cache_->recordAccess(key); + auto sf = cache_->asyncFind(key); if (sf.isReady()) { // If the handle is ready, call onReadyFn directly to process the handle onReadyFn(std::move(sf).value()); @@ -283,9 +282,9 @@ class AsyncCacheStressor : public Stressor { ThroughputStats& stats, const Request* req, folly::EventBase* evb, - const std::string* key) { + const std::string_view key) { ++stats.get; - auto lock = chainedItemAcquireUniqueLock(*key); + auto lock = chainedItemAcquireUniqueLock(key); // This was moved outside the lambda, as otherwise gcc-8.x crashes with an // internal compiler error here (suspected regression in folly). @@ -297,7 +296,7 @@ class AsyncCacheStressor : public Stressor { ++stats.getMiss; ++stats.set; - wHdl = cache_->allocate(pid, *key, *(req->sizeBegin), req->ttlSecs); + wHdl = cache_->allocate(pid, key, *(req->sizeBegin), req->ttlSecs); if (!wHdl) { ++stats.setFailure; return; @@ -327,7 +326,7 @@ class AsyncCacheStressor : public Stressor { }; // Always use asyncFind as findToWrite is sync when using HybridCache - auto sf = cache_->asyncFind(*key); + auto sf = cache_->asyncFind(key); if (sf.isReady()) { onReadyFn(std::move(sf).value()); return; @@ -345,10 +344,10 @@ class AsyncCacheStressor : public Stressor { void asyncUpdate(ThroughputStats& stats, const Request* req, folly::EventBase* evb, - const std::string* key) { + const std::string_view key) { ++stats.get; ++stats.update; - auto lock = chainedItemAcquireUniqueLock(*key); + auto lock = chainedItemAcquireUniqueLock(key); if (ticker_) { ticker_->updateTimeStamp(req->timestamp); } @@ -363,7 +362,7 @@ class AsyncCacheStressor : public Stressor { cache_->updateItemRecordVersion(wHdl); }; - auto sf = cache_->asyncFind(*key); + auto sf = cache_->asyncFind(key); if (sf.isReady()) { onReadyFn(std::move(sf).value()); return; @@ -457,18 +456,18 @@ class AsyncCacheStressor : public Stressor { const auto pid = static_cast(opPoolDist(gen)); const Request& req(getReq(pid, gen, lastRequestId)); OpType op = req.getOp(); - const std::string* key = &(req.key); - std::string oneHitKey; + std::string_view key = req.key; + std::string_view oneHitKey; if (op == OpType::kLoneGet || op == OpType::kLoneSet) { oneHitKey = Request::getUniqueKey(); - key = &oneHitKey; + key = oneHitKey; } OpResultType result(OpResultType::kNop); switch (op) { case OpType::kLoneSet: case OpType::kSet: { - auto lock = chainedItemAcquireUniqueLock(*key); + auto lock = chainedItemAcquireUniqueLock(key); result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs, req.admFeatureMap); @@ -481,8 +480,8 @@ class AsyncCacheStressor : public Stressor { } case OpType::kDel: { ++stats.del; - auto lock = chainedItemAcquireUniqueLock(*key); - auto res = cache_->remove(*key); + auto lock = chainedItemAcquireUniqueLock(key); + auto res = cache_->remove(key); if (res == CacheT::RemoveRes::kNotFoundInRam) { ++stats.delNotFound; } @@ -532,7 +531,7 @@ class AsyncCacheStressor : public Stressor { OpResultType setKey( PoolId pid, ThroughputStats& stats, - const std::string* key, + const std::string_view key, size_t size, uint32_t ttlSecs, const std::unordered_map& featureMap) { @@ -543,7 +542,7 @@ class AsyncCacheStressor : public Stressor { } ++stats.set; - auto it = cache_->allocate(pid, *key, size, ttlSecs); + auto it = cache_->allocate(pid, key, size, ttlSecs); if (it == nullptr) { ++stats.setFailure; return OpResultType::kSetFailure; diff --git a/cachelib/cachebench/runner/CacheStressor.h b/cachelib/cachebench/runner/CacheStressor.h index 36e7537465..3889d2b48e 100644 --- a/cachelib/cachebench/runner/CacheStressor.h +++ b/cachelib/cachebench/runner/CacheStressor.h @@ -321,11 +321,11 @@ class CacheStressor : public Stressor { const auto pid = static_cast(opPoolDist(gen)); const Request& req(getReq(pid, gen, lastRequestId)); OpType op = req.getOp(); - const std::string* key = &(req.key); - std::string oneHitKey; + std::string_view key = req.key; + std::string_view oneHitKey; if (op == OpType::kLoneGet || op == OpType::kLoneSet) { oneHitKey = Request::getUniqueKey(); - key = &oneHitKey; + key = oneHitKey; } OpResultType result(OpResultType::kNop); @@ -333,12 +333,12 @@ class CacheStressor : public Stressor { case OpType::kLoneSet: case OpType::kSet: { if (config_.onlySetIfMiss) { - auto it = cache_->find(*key); + auto it = cache_->find(key); if (it != nullptr) { continue; } } - auto lock = chainedItemAcquireUniqueLock(*key); + auto lock = chainedItemAcquireUniqueLock(key); result = setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs, req.admFeatureMap, req.itemValue); @@ -348,8 +348,8 @@ class CacheStressor : public Stressor { case OpType::kGet: { ++stats.get; - auto slock = chainedItemAcquireSharedLock(*key); - auto xlock = decltype(chainedItemAcquireUniqueLock(*key)){}; + auto slock = chainedItemAcquireSharedLock(key); + auto xlock = decltype(chainedItemAcquireUniqueLock(key)){}; if (ticker_) { ticker_->updateTimeStamp(req.timestamp); @@ -357,8 +357,8 @@ class CacheStressor : public Stressor { // TODO currently pure lookaside, we should // add a distribution over sequences of requests/access patterns // e.g. get-no-set and set-no-get - cache_->recordAccess(*key); - auto it = cache_->find(*key); + cache_->recordAccess(key); + auto it = cache_->find(key); if (it == nullptr) { ++stats.getMiss; result = OpResultType::kGetMiss; @@ -368,7 +368,7 @@ class CacheStressor : public Stressor { // upgrade access privledges, (lock_upgrade is not // appropriate here) slock = {}; - xlock = chainedItemAcquireUniqueLock(*key); + xlock = chainedItemAcquireUniqueLock(key); setKey(pid, stats, key, *(req.sizeBegin), req.ttlSecs, req.admFeatureMap, req.itemValue); } @@ -380,8 +380,8 @@ class CacheStressor : public Stressor { } case OpType::kDel: { ++stats.del; - auto lock = chainedItemAcquireUniqueLock(*key); - auto res = cache_->remove(*key); + auto lock = chainedItemAcquireUniqueLock(key); + auto res = cache_->remove(key); if (res == CacheT::RemoveRes::kNotFoundInRam) { ++stats.delNotFound; } @@ -389,13 +389,13 @@ class CacheStressor : public Stressor { } case OpType::kAddChained: { ++stats.get; - auto lock = chainedItemAcquireUniqueLock(*key); - auto it = cache_->findToWrite(*key); + auto lock = chainedItemAcquireUniqueLock(key); + auto it = cache_->findToWrite(key); if (!it) { ++stats.getMiss; ++stats.set; - it = cache_->allocate(pid, *key, *(req.sizeBegin), req.ttlSecs); + it = cache_->allocate(pid, key, *(req.sizeBegin), req.ttlSecs); if (!it) { ++stats.setFailure; break; @@ -426,11 +426,11 @@ class CacheStressor : public Stressor { case OpType::kUpdate: { ++stats.get; ++stats.update; - auto lock = chainedItemAcquireUniqueLock(*key); + auto lock = chainedItemAcquireUniqueLock(key); if (ticker_) { ticker_->updateTimeStamp(req.timestamp); } - auto it = cache_->findToWrite(*key); + auto it = cache_->findToWrite(key); if (it == nullptr) { ++stats.getMiss; ++stats.updateMiss; @@ -441,7 +441,7 @@ class CacheStressor : public Stressor { } case OpType::kCouldExist: { ++stats.couldExistOp; - if (!cache_->couldExist(*key)) { + if (!cache_->couldExist(key)) { ++stats.couldExistOpFalse; } break; @@ -476,7 +476,7 @@ class CacheStressor : public Stressor { OpResultType setKey( PoolId pid, ThroughputStats& stats, - const std::string* key, + const std::string_view key, size_t size, uint32_t ttlSecs, const std::unordered_map& featureMap, @@ -488,7 +488,7 @@ class CacheStressor : public Stressor { } ++stats.set; - auto it = cache_->allocate(pid, *key, size, ttlSecs); + auto it = cache_->allocate(pid, key, size, ttlSecs); if (it == nullptr) { ++stats.setFailure; return OpResultType::kSetFailure; diff --git a/cachelib/cachebench/runner/Stressor.cpp b/cachelib/cachebench/runner/Stressor.cpp index 3852bee0c9..888ebd1d28 100644 --- a/cachelib/cachebench/runner/Stressor.cpp +++ b/cachelib/cachebench/runner/Stressor.cpp @@ -22,6 +22,7 @@ #include "cachelib/cachebench/runner/FastShutdown.h" #include "cachelib/cachebench/runner/IntegrationStressor.h" #include "cachelib/cachebench/workload/BlockChunkReplayGenerator.h" +#include "cachelib/cachebench/workload/BinaryKVReplayGenerator.h" #include "cachelib/cachebench/workload/KVReplayGenerator.h" #include "cachelib/cachebench/workload/OnlineGenerator.h" #include "cachelib/cachebench/workload/PieceWiseReplayGenerator.h" @@ -145,6 +146,8 @@ std::unique_ptr makeGenerator(const StressorConfig& config) { return std::make_unique(config); } else if (config.generator == "block-replay") { return std::make_unique(config); + } else if (config.generator == "binary-replay") { + return std::make_unique(config); } else if (config.generator.empty() || config.generator == "workload") { // TODO: Remove the empty() check once we label workload-based configs // properly diff --git a/cachelib/cachebench/util/Config.cpp b/cachelib/cachebench/util/Config.cpp index fa97fd852e..53518d1de4 100644 --- a/cachelib/cachebench/util/Config.cpp +++ b/cachelib/cachebench/util/Config.cpp @@ -90,7 +90,7 @@ StressorConfig::StressorConfig(const folly::dynamic& configJson) { // If you added new fields to the configuration, update the JSONSetVal // to make them available for the json configs and increment the size // below - checkCorrectSize(); + checkCorrectSize(); } bool StressorConfig::usesChainedItems() const { @@ -197,6 +197,10 @@ DistributionConfig::DistributionConfig(const folly::dynamic& jsonConfig, ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) { JSONSetVal(configJson, ampFactor); + JSONSetVal(configJson, ampSizeFactor); + JSONSetVal(configJson, binaryFileName); + JSONSetVal(configJson, fastForwardCount); + JSONSetVal(configJson, preLoadReqs); JSONSetVal(configJson, replaySerializationMode); JSONSetVal(configJson, relaxedSerialIntervalMs); JSONSetVal(configJson, numAggregationFields); @@ -217,7 +221,7 @@ ReplayGeneratorConfig::ReplayGeneratorConfig(const folly::dynamic& configJson) { "Unsupported request serialization mode: {}", replaySerializationMode)); } - checkCorrectSize(); + checkCorrectSize(); } ReplayGeneratorConfig::SerializeMode diff --git a/cachelib/cachebench/util/Config.h b/cachelib/cachebench/util/Config.h index 96c5967ebf..6e1b2fef07 100644 --- a/cachelib/cachebench/util/Config.h +++ b/cachelib/cachebench/util/Config.h @@ -125,6 +125,19 @@ struct ReplayGeneratorConfig : public JSONConfig { std::string replaySerializationMode{"strict"}; uint32_t ampFactor{1}; + uint32_t ampSizeFactor{1}; + + // the path of the binary file to make + std::string binaryFileName{}; + + // The number of requests (not including ampFactor) to skip + // in the trace. This is so that after warming up the cache + // with a certain number of requests, we can easily reattach + // and resume execution with different cache configurations. + uint64_t fastForwardCount{0}; + + // The number of requests to pre load into the request queues + uint64_t preLoadReqs{0}; // The time interval threshold when replaySerializationMode is relaxed. uint64_t relaxedSerialIntervalMs{500}; diff --git a/cachelib/cachebench/util/Request.h b/cachelib/cachebench/util/Request.h index 23d82204c3..15ed9dde92 100644 --- a/cachelib/cachebench/util/Request.h +++ b/cachelib/cachebench/util/Request.h @@ -60,6 +60,35 @@ enum class OpResultType { kCouldExistFalse }; +struct BinaryRequest { + uint8_t keySize_; + uint8_t op_; + uint16_t repeats_; + uint16_t ttl_; + uint64_t keyOffset_; + uint32_t valueSize_; + + BinaryRequest() = default; + + BinaryRequest(uint8_t keySize, + size_t valueSize, + uint16_t repeats, + uint8_t op, + uint16_t ttl, + size_t keyOffset) + : keySize_(keySize), + valueSize_(valueSize), + repeats_(repeats), + op_(op), + ttl_(ttl), + keyOffset_(keyOffset) {} + + std::string_view getKey() const { + return std::string_view(reinterpret_cast(this) + keyOffset_, + keySize_); + } +}; + struct Request { Request(std::string& k, std::vector::iterator b, @@ -79,6 +108,9 @@ struct Request { uint64_t reqId) : key(k), sizeBegin(b), sizeEnd(e), requestId(reqId), op(o) {} + Request(std::string_view k, size_t* b, OpType o, uint64_t reqId) + : key(k), sizeBegin(b), requestId(reqId), op(o) {} + Request(std::string& k, std::vector::iterator b, std::vector::iterator e, @@ -121,12 +153,23 @@ struct Request { Request(Request&& r) noexcept : key(r.key), sizeBegin(r.sizeBegin), sizeEnd(r.sizeEnd) {} - Request& operator=(Request&& r) = delete; - + Request& operator=(Request&&) = delete; + + inline void update(std::string_view k, + size_t* valueSize, + OpType o, + uint16_t ttl, + uint64_t reqId) { + key = k; + op = o; + ttlSecs = ttl; + requestId = reqId; + sizeBegin = std::vector::iterator(valueSize); + } OpType getOp() const noexcept { return op.load(); } void setOp(OpType o) noexcept { op = o; } - std::string& key; + std::string_view key; // size iterators in case this request is // deemed to be a chained item. @@ -137,7 +180,7 @@ struct Request { // TTL in seconds. uint32_t ttlSecs{0}; - const std::optional requestId; + std::optional requestId; // Feature map for this request sample, which is used for for admission // policy: feature name --> feature value diff --git a/cachelib/cachebench/workload/BinaryKVReplayGenerator.h b/cachelib/cachebench/workload/BinaryKVReplayGenerator.h new file mode 100644 index 0000000000..6c62d5beb9 --- /dev/null +++ b/cachelib/cachebench/workload/BinaryKVReplayGenerator.h @@ -0,0 +1,200 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "cachelib/cachebench/cache/Cache.h" +#include "cachelib/cachebench/util/Exceptions.h" +#include "cachelib/cachebench/util/Parallel.h" +#include "cachelib/cachebench/util/Request.h" +#include "cachelib/cachebench/workload/ReplayGeneratorBase.h" + +namespace facebook { +namespace cachelib { +namespace cachebench { + +// BinaryKVReplayGenerator generates the cachelib requests based on the +// requests read from the given binary trace file made with KVReplayGenerator +// In order to minimize the contentions for the request submission queues +// which might need to be dispatched by multiple stressor threads, +// the requests are sharded to each stressor by doing hashing over the key. +class BinaryKVReplayGenerator : public ReplayGeneratorBase { + public: + explicit BinaryKVReplayGenerator(const StressorConfig& config) + : ReplayGeneratorBase(config), binaryStream_(config, numShards_) { + for (uint32_t i = 0; i < numShards_; ++i) { + stressorCtxs_.emplace_back( + std::make_unique(i, fastForwardCount_)); + } + + XLOGF(INFO, + "Started BinaryKVReplayGenerator (# of stressor threads {}, total " + "requests {})", + numShards_, binaryStream_.getNumReqs()); + } + + virtual ~BinaryKVReplayGenerator() { XCHECK(shouldShutdown()); } + + // getReq generates the next request from the trace file. + const Request& getReq( + uint8_t, + std::mt19937_64&, + std::optional lastRequestId = std::nullopt) override; + + void renderStats(uint64_t, std::ostream& out) const override { + out << std::endl << "== BinaryKVReplayGenerator Stats ==" << std::endl; + + out << folly::sformat("{}: {:.2f} million (parse error: {})", + "Total Processed Samples", + (double)parseSuccess.load() / 1e6, parseError.load()) + << std::endl; + } + + void markFinish() override { getStressorCtx().markFinish(); } + + private: + // per thread: the number of requests to run through + // the trace before jumping to next offset + static constexpr size_t kRunLength = 10000; + + // StressorCtx keeps track of the state including the submission queues + // per stressor thread. Since there is only one request generator thread, + // lock-free ProducerConsumerQueue is used for performance reason. + // Also, separate queue which is dispatched ahead of any requests in the + // submission queue is used for keeping track of the requests which need to be + // resubmitted (i.e., a request having remaining repeat count); there could + // be more than one requests outstanding for async stressor while only one + // can be outstanding for sync stressor + struct StressorCtx { + explicit StressorCtx(uint32_t id, uint32_t fastForwardCount) + : id_(id), reqIdx_(id * kRunLength + fastForwardCount) { + std::string_view s{"abc"}; + requestPtr_ = + new Request(s, reinterpret_cast(0), OpType::kGet, 0); + runIdx_ = 0; + } + + bool isFinished() { return finished_.load(std::memory_order_relaxed); } + void markFinish() { finished_.store(true, std::memory_order_relaxed); } + + Request* requestPtr_; + uint64_t reqIdx_{0}; + uint32_t id_{0}; + uint64_t runIdx_{0}; + // Thread that finish its operations mark it here, so we will skip + // further request on its shard + std::atomic finished_{false}; + }; + + // Used to assign stressorIdx_ + std::atomic incrementalIdx_{0}; + + // A sticky index assigned to each stressor threads that calls into + // the generator. + folly::ThreadLocalPtr stressorIdx_; + + // Vector size is equal to the # of stressor threads; + // stressorIdx_ is used to index. + std::vector> stressorCtxs_; + + // Class that holds a vector of pointers to the + // binary data + BinaryFileStream binaryStream_; + + // Used to signal end of file as EndOfTrace exception + std::atomic eof{false}; + + // Stats + std::atomic parseError = 0; + std::atomic parseSuccess = 0; + + void setEOF() { eof.store(true, std::memory_order_relaxed); } + bool isEOF() { return eof.load(std::memory_order_relaxed); } + + inline StressorCtx& getStressorCtx(size_t shardId) { + XCHECK_LT(shardId, numShards_); + return *stressorCtxs_[shardId]; + } + + inline StressorCtx& getStressorCtx() { + if (!stressorIdx_.get()) { + stressorIdx_.reset(new uint32_t(incrementalIdx_++)); + } + + return getStressorCtx(*stressorIdx_); + } +}; + +const Request& BinaryKVReplayGenerator::getReq(uint8_t, + std::mt19937_64& gen, + std::optional) { + auto& stressorCtx = getStressorCtx(); + auto& r = *stressorCtx.requestPtr_; + BinaryRequest* prevReq = reinterpret_cast(*(r.requestId)); + if (prevReq != nullptr && prevReq->repeats_ > 1) { + prevReq->repeats_ = prevReq->repeats_ - 1; + } else { + BinaryRequest* req = nullptr; + try { + req = binaryStream_.getNextPtr(stressorCtx.reqIdx_ + stressorCtx.runIdx_); + } catch (const EndOfTrace& e) { + setEOF(); + throw cachelib::cachebench::EndOfTrace("Test stopped or EOF reached"); + } + + XDCHECK_NE(req, nullptr); + XDCHECK_NE(reinterpret_cast(req), 0); + XDCHECK_LT(req->op_, 12); + auto key = req->getKey(); + OpType op; + switch (req->op_) { + case 1: + op = OpType::kGet; + break; + case 2: + op = OpType::kSet; + break; + case 3: + op = OpType::kDel; + break; + } + req->valueSize_ = (req->valueSize_) * ampSizeFactor_; + r.update(key, + const_cast(reinterpret_cast(&req->valueSize_)), + op, + req->ttl_, + reinterpret_cast(req)); + if (stressorCtx.runIdx_ < kRunLength) { + stressorCtx.runIdx_++; + } else { + stressorCtx.runIdx_ = 0; + stressorCtx.reqIdx_ += numShards_ * kRunLength; + } + } + return r; +} + +} // namespace cachebench +} // namespace cachelib +} // namespace facebook diff --git a/cachelib/cachebench/workload/KVReplayGenerator.h b/cachelib/cachebench/workload/KVReplayGenerator.h index 6ac001d306..4ec0585f20 100644 --- a/cachelib/cachebench/workload/KVReplayGenerator.h +++ b/cachelib/cachebench/workload/KVReplayGenerator.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "cachelib/cachebench/cache/Cache.h" @@ -33,6 +34,8 @@ namespace facebook { namespace cachelib { namespace cachebench { +#define BIN_REQ_INT 100000000 + struct ReqWrapper { ReqWrapper() = default; @@ -97,15 +100,67 @@ class KVReplayGenerator : public ReplayGeneratorBase { for (uint32_t i = 0; i < numShards_; ++i) { stressorCtxs_.emplace_back(std::make_unique(i)); } + if (!binaryFileName_.empty()) { + makeBinaryFile_ = true; + } + folly::Latch latch(1); + if (makeBinaryFile_) { + XLOGF(INFO, + "Started generating binary file from KVReplayGenerator" + "(amp factor {}, size factor {}, # of stressor threads {}, fast " + "forward {})", + ampFactor_, ampSizeFactor_, numShards_, fastForwardCount_); + int fd = open(binaryFileName_.c_str(), O_RDWR | O_CREAT | O_TRUNC, + S_IRUSR | S_IWUSR); + // first get the number of requests + auto traceInfo = traceStream_.getTraceRequestStats(); + auto nreqs = traceInfo.first * ampFactor_; + auto totalKeySize = traceInfo.second; + totalReqs_ = nreqs; + size_t binReqSize = sizeof(BinaryRequest); + size_t totalSize = sizeof(size_t) + nreqs * binReqSize + totalKeySize + 1; + size_t totalSizeP = + totalSize + (PG_SIZE - totalSize % PG_SIZE); // page align + ftruncate(fd, totalSizeP); + void* memory = + mmap(nullptr, totalSizeP, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + void* copy = memory; + // the first entry is the total number of requests in the file, so we can + // calculate the key offset + size_t* first = reinterpret_cast(memory); + std::memcpy(first, &nreqs, sizeof(size_t)); + first++; + memory = reinterpret_cast(first); + outputStreamReqs_ = reinterpret_cast(memory); + outputStreamKeys_ = reinterpret_cast(memory); + outputStreamKeys_ += nreqs * binReqSize; + folly::setThreadName("cb_binary_gen"); + traceStream_.fastForwardTrace(fastForwardCount_); + genRequests(latch); + int res = msync(copy, totalSizeP, MS_SYNC); + if (res != 0) { + XLOGF(INFO, "msync error {}", res); + } + res = munmap(copy, totalSizeP); + if (res != 0) { + XLOGF(INFO, "unmap error {}", res); + } + close(fd); + exit(0); + } else { + genWorker_ = std::thread([this, &latch] { + folly::setThreadName("cb_replay_gen"); + traceStream_.fastForwardTrace(fastForwardCount_); + genRequests(latch); + }); + } - genWorker_ = std::thread([this] { - folly::setThreadName("cb_replay_gen"); - genRequests(); - }); + latch.wait(); XLOGF(INFO, - "Started KVReplayGenerator (amp factor {}, # of stressor threads {})", - ampFactor_, numShards_); + "Started KVReplayGenerator (amp factor {}, # of stressor threads {}, " + "fast forward {})", + ampFactor_, numShards_, fastForwardCount_); } virtual ~KVReplayGenerator() { @@ -191,6 +246,10 @@ class KVReplayGenerator : public ReplayGeneratorBase { std::vector> stressorCtxs_; TraceFileStream traceStream_; + BinaryRequest* outputStreamReqs_; + char* outputStreamKeys_; + size_t totalReqs_; + bool makeBinaryFile_{false}; std::thread genWorker_; @@ -201,7 +260,7 @@ class KVReplayGenerator : public ReplayGeneratorBase { std::atomic parseError = 0; std::atomic parseSuccess = 0; - void genRequests(); + void genRequests(folly::Latch& latch); void setEOF() { eof.store(true, std::memory_order_relaxed); } bool isEOF() { return eof.load(std::memory_order_relaxed); } @@ -306,12 +365,22 @@ inline std::unique_ptr KVReplayGenerator::getReqInternal() { return reqWrapper; } -inline void KVReplayGenerator::genRequests() { +inline void KVReplayGenerator::genRequests(folly::Latch& latch) { + bool init = true; + uint64_t nreqs = 0; + size_t keyOffset = 0; + size_t lastKeyOffset = 0; + size_t keyAddr; + auto begin = util::getCurrentTimeSec(); + auto binReqStart = util::getCurrentTimeSec(); while (!shouldShutdown()) { std::unique_ptr reqWrapper; try { reqWrapper = getReqInternal(); } catch (const EndOfTrace& e) { + if (init) { + latch.count_down(); + } break; } @@ -324,6 +393,7 @@ inline void KVReplayGenerator::genRequests() { req = std::make_unique(*reqWrapper); } + size_t keySize = req->key_.size(); if (ampFactor_ > 1) { // Replace the last 4 bytes with thread Id of 4 decimal chars. In doing // so, keep at least 10B from the key for uniqueness; 10B is the max @@ -336,15 +406,87 @@ inline void KVReplayGenerator::genRequests() { req->key_.append(folly::sformat("{:04d}", keySuffix)); } - auto shardId = getShard(req->req_.key); - auto& stressorCtx = getStressorCtx(shardId); - auto& reqQ = *stressorCtx.reqQueue_; + if (makeBinaryFile_) { + uint8_t op = 0; + switch (req->req_.getOp()) { + case OpType::kGet: + op = 1; + break; + case OpType::kSet: + op = 2; + break; + case OpType::kDel: + op = 3; + break; + } + auto valueSize = req->sizes_[0] * ampSizeFactor_; + // calculate the offset for the key relative to position of current + // request + uint64_t relKeyOffset = + reinterpret_cast(outputStreamKeys_ + keyOffset) - + reinterpret_cast(outputStreamReqs_ + nreqs); + auto binReq = BinaryRequest(keySize, valueSize, req->repeats_, op, + req->req_.ttlSecs, relKeyOffset); + // copy the binary request struct to the output stream (mmap'd file) + std::memcpy(outputStreamReqs_ + nreqs, &binReq, sizeof(binReq)); + // copy the key to the output stream for keys (same mmap'd file, but at + // different offset) + std::memcpy(outputStreamKeys_ + keyOffset, req->key_.c_str(), keySize); + if ((nreqs % BIN_REQ_INT) == 0 && nreqs > 0) { + auto end = util::getCurrentTimeSec(); + double reqsPerSec = BIN_REQ_INT / (double)(end - binReqStart); + + uint64_t reqStart = reinterpret_cast(outputStreamReqs_ + + (nreqs - BIN_REQ_INT)); + reqStart = reqStart + (PG_SIZE - reqStart % PG_SIZE); + uint64_t reqEnd = + reinterpret_cast(outputStreamReqs_ + nreqs); + reqEnd = reqEnd + (PG_SIZE - reqEnd % PG_SIZE); + int rres = madvise(reinterpret_cast(reqStart), + reqEnd - reqStart, MADV_DONTNEED); + XDCHECK_EQ(rres, 0); + + uint64_t keyStart = + reinterpret_cast(outputStreamKeys_ + lastKeyOffset); + keyStart = keyStart + (PG_SIZE - keyStart % PG_SIZE); + uint64_t keyEnd = + reinterpret_cast(outputStreamKeys_ + keyOffset); + keyEnd = keyEnd + (PG_SIZE - keyEnd % PG_SIZE); + + int kres = madvise(reinterpret_cast(keyStart), + keyEnd - keyStart, MADV_DONTNEED); + XDCHECK_EQ(kres, 0); + + XLOGF(INFO, "Parsed: {} reqs ({:.2f} reqs/sec)", nreqs, reqsPerSec); + lastKeyOffset = keyOffset; + binReqStart = util::getCurrentTimeSec(); + } + nreqs++; + keyOffset += keySize; + } else { + auto shardId = getShard(req->req_.key); + auto& stressorCtx = getStressorCtx(shardId); + auto& reqQ = *stressorCtx.reqQueue_; + + while (!reqQ.write(std::move(req)) && !stressorCtx.isFinished() && + !shouldShutdown()) { + // ProducerConsumerQueue does not support blocking, so use sleep + if (init) { + latch.count_down(); + init = false; + } + std::this_thread::sleep_for( + std::chrono::microseconds{checkIntervalUs_}); + } + nreqs++; + } - while (!reqQ.write(std::move(req)) && !stressorCtx.isFinished() && - !shouldShutdown()) { - // ProducerConsumerQueue does not support blocking, so use sleep - std::this_thread::sleep_for( - std::chrono::microseconds{checkIntervalUs_}); + if (nreqs >= preLoadReqs_ && init) { + auto end = util::getCurrentTimeSec(); + double reqsPerSec = nreqs / (double)(end - begin); + XLOGF(INFO, "Parse rate: {:.2f} reqs/sec", reqsPerSec); + latch.count_down(); + init = false; } } } diff --git a/cachelib/cachebench/workload/OnlineGenerator.cpp b/cachelib/cachebench/workload/OnlineGenerator.cpp index 9391d0af8c..7f00a5bdf4 100644 --- a/cachelib/cachebench/workload/OnlineGenerator.cpp +++ b/cachelib/cachebench/workload/OnlineGenerator.cpp @@ -55,10 +55,12 @@ const Request& OnlineGenerator::getReq(uint8_t poolId, std::optional) { size_t keyIdx = getKeyIdx(poolId, gen); - generateKey(poolId, keyIdx, req_->key); + std::string key(req_->key); + generateKey(poolId, keyIdx, key); auto sizes = generateSize(poolId, keyIdx); req_->sizeBegin = sizes->begin(); req_->sizeEnd = sizes->end(); + req_->key = key; auto op = static_cast(workloadDist_[workloadIdx(poolId)].sampleOpDist(gen)); req_->setOp(op); diff --git a/cachelib/cachebench/workload/PieceWiseReplayGenerator.h b/cachelib/cachebench/workload/PieceWiseReplayGenerator.h index cdcba727d8..1a5efb2f57 100644 --- a/cachelib/cachebench/workload/PieceWiseReplayGenerator.h +++ b/cachelib/cachebench/workload/PieceWiseReplayGenerator.h @@ -45,7 +45,10 @@ class PieceWiseReplayGenerator : public ReplayGeneratorBase { threadFinished_[i].store(false, std::memory_order_relaxed); } - traceGenThread_ = std::thread([this]() { getReqFromTrace(); }); + traceGenThread_ = std::thread([this]() { + traceStream_.fastForwardTrace(fastForwardCount_); + getReqFromTrace(); + }); } virtual ~PieceWiseReplayGenerator() { diff --git a/cachelib/cachebench/workload/ReplayGeneratorBase.h b/cachelib/cachebench/workload/ReplayGeneratorBase.h index 348e570b76..e22b6b3002 100644 --- a/cachelib/cachebench/workload/ReplayGeneratorBase.h +++ b/cachelib/cachebench/workload/ReplayGeneratorBase.h @@ -16,9 +16,15 @@ #pragma once +#include +#include #include #include #include +#include +#include +#include +#include #include #include @@ -30,14 +36,17 @@ #include "cachelib/cachebench/util/Config.h" #include "cachelib/cachebench/util/Exceptions.h" +#include "cachelib/cachebench/util/Request.h" #include "cachelib/cachebench/workload/GeneratorBase.h" +#define PG_SIZE 4096 + namespace facebook { namespace cachelib { namespace cachebench { +constexpr size_t kPgRelease = 100000000; constexpr size_t kIfstreamBufferSize = 1L << 14; - // ColumnInfo is to pass the information required to parse the trace // to map the column to the replayer-specific field ID. struct ColumnInfo { @@ -85,6 +94,63 @@ class TraceFileStream { return infile_; } + // Returns the total number of requests in the trace + // plus the average key size so we can properly + // set up the binary file stream via mmap + // + // We return a pair, the first value is the number + // of lines in the trace and the second value is the + // average key size + std::pair getTraceRequestStats() { + uint64_t lines = 0; + uint64_t totalKeySize = 0; + const std::string& traceFileName = infileNames_[0]; + + std::ifstream file; + std::string line; + file.open(traceFileName); + + while (std::getline(file, line)) { + if (!line.empty()) { + // Default order is key,op,size,op_count,key_size,ttl + nextLineFields_.clear(); + folly::split(",", line, nextLineFields_); + + if (nextLineFields_.size() < minNumFields_) { + XLOG_N_PER_MS(INFO, 10, 1000) << folly::sformat( + "Error parsing next line \"{}\": shorter than min required " + "fields {}", + line, minNumFields_); + } + auto keySizeField = folly::tryTo(nextLineFields_[4]); + if (keySizeField.hasValue()) { + // The key is encoded as . + size_t keySize = keySizeField.value(); + // The key size should not exceed 256 + keySize = std::min(keySize, 256); + totalKeySize += keySize; + } + lines++; + } + } + file.close(); + + return std::make_pair(lines - 1, totalKeySize); + } + + // The number of requests (not including ampFactor) to skip + // in the trace. This is so that after warming up the cache + // with a certain number of requests, we can easily reattach + // and resume execution with different cache configurations. + void fastForwardTrace(uint64_t fastForwardCount) { + uint64_t count = 0; + while (count < fastForwardCount) { + std::string line; + this->getline(line); // can throw + count++; + } + } + bool setNextLine(const std::string& line) { nextLineFields_.clear(); folly::split(",", line, nextLineFields_); @@ -255,6 +321,119 @@ class TraceFileStream { std::vector nextLineFields_; std::vector keys_; + uint64_t lines_ = 0; +}; + +class BinaryFileStream { + public: + BinaryFileStream(const StressorConfig& config, const uint32_t numShards) + : infileName_(config.traceFileName) { + fd_ = open(infileName_.c_str(), O_RDONLY); + // Get the size of the file + struct stat fileStat; + if (fstat(fd_, &fileStat) == -1) { + close(fd_); + XLOGF(INFO, "Error reading file size {}", infileName_); + } + size_t* binaryData = reinterpret_cast( + mmap(nullptr, fileStat.st_size, PROT_READ | PROT_WRITE, MAP_PRIVATE, + fd_, 0)); + + // binary data pg align save for releasing old requests + pgBinaryData_ = reinterpret_cast(binaryData); + + // first value is the number of requests in the file + nreqs_ = binaryData[0]; + binaryData++; + // next is the request data + binaryReqData_ = reinterpret_cast(binaryData); + binaryKeyData_ = reinterpret_cast(binaryData); + + // keys are stored after request structures + binaryKeyData_ += nreqs_ * sizeof(BinaryRequest); + // save the pg aligned key space for releasing old key data + pgBinaryKeyData_ = reinterpret_cast( + reinterpret_cast(binaryKeyData_) + + (PG_SIZE - (reinterpret_cast(binaryKeyData_) % PG_SIZE))); + + releaseCount_ = 1; // release data after first nRelease reqs + nRelease_ = kPgRelease; + releaseIdx_ = nRelease_ * releaseCount_; + } + + ~BinaryFileStream() { close(fd_); } + + uint64_t getNumReqs() { return nreqs_; } + + void releaseOldData(uint64_t reqsCompleted) { + // The number of bytes from the key data stream to release + uint64_t keyBytes = + (reinterpret_cast(binaryReqData_ + releaseIdx_) + + binaryReqData_[releaseIdx_].keyOffset_) - + reinterpret_cast(binaryKeyData_); + + int kres = madvise(reinterpret_cast(pgBinaryKeyData_), keyBytes, + MADV_DONTNEED); + if (kres != 0) { + XLOGF(INFO, "Failed to release old keys, key bytes: {}, result: {}", + keyBytes, strerror(errno)); + XDCHECK_EQ(kres, 0); + } else { + XLOGF(INFO, "Release old keys, key bytes: {}", keyBytes); + } + + int reqRes = madvise(reinterpret_cast(pgBinaryData_), + (releaseIdx_) * sizeof(BinaryRequest) + sizeof(size_t), + MADV_DONTNEED); + + XDCHECK_EQ(reqRes, 0); + if (reqRes != 0) { + XLOGF(INFO, + "Failed to release old requests, released: {}, completed: {}, " + "result: {}", + releaseIdx_, reqsCompleted, strerror(errno)); + } else { + XLOGF(INFO, "Released old requests, released: {}, completed: {}", + releaseIdx_, reqsCompleted); + } + + releaseIdx_ = nRelease_ * (++releaseCount_); + } + + BinaryRequest* getNextPtr(uint64_t reqIdx) { + if ((reqIdx > releaseIdx_) && (reqIdx % nRelease_ == 0)) { + releaseOldData(reqIdx); + } + if (reqIdx >= nreqs_) { + throw cachelib::cachebench::EndOfTrace(""); + } + BinaryRequest* binReq = binaryReqData_ + reqIdx; + XDCHECK_LT(binReq->op_, 12); + return binReq; + } + + private: + const StressorConfig config_; + std::string infileName_; + + // pointers to mmaped data + BinaryRequest* binaryReqData_; + char* binaryKeyData_; + + // these two pointers are to madvise + // away old request data + void* pgBinaryKeyData_; + void* pgBinaryData_; + + // number of requests released so far + size_t releaseIdx_; + size_t releaseCount_; + // how often to release old requests + size_t nRelease_; + + size_t fileSize_; + uint64_t nreqs_; + int fd_; }; class ReplayGeneratorBase : public GeneratorBase { @@ -263,6 +442,10 @@ class ReplayGeneratorBase : public GeneratorBase { : config_(config), repeatTraceReplay_{config_.repeatTraceReplay}, ampFactor_(config.replayGeneratorConfig.ampFactor), + ampSizeFactor_(config.replayGeneratorConfig.ampSizeFactor), + binaryFileName_(config.replayGeneratorConfig.binaryFileName), + fastForwardCount_(config.replayGeneratorConfig.fastForwardCount), + preLoadReqs_(config.replayGeneratorConfig.preLoadReqs), timestampFactor_(config.timestampFactor), numShards_(config.numThreads), mode_(config_.replayGeneratorConfig.getSerializationMode()) { @@ -280,7 +463,10 @@ class ReplayGeneratorBase : public GeneratorBase { const StressorConfig config_; const bool repeatTraceReplay_; const size_t ampFactor_; - + const size_t ampSizeFactor_; + const uint64_t fastForwardCount_; + const uint64_t preLoadReqs_; + const std::string binaryFileName_; // The constant to be divided from the timestamp value // to turn the timestamp into seconds. const uint64_t timestampFactor_{1};