Skip to content

Commit

Permalink
fix(server): Fix #207 (#208)
Browse files Browse the repository at this point in the history
1. Erase expiry data from the expire table in case of evictions.
2. Add test that covers the bug.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jul 15, 2022
1 parent d2b7987 commit 05eb323
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 5 deletions.
3 changes: 2 additions & 1 deletion src/redis/object.c
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ sds getMemoryDoctorReport(void) {
return s;
}

#endif


/* Set the object LRU/LFU depending on server.maxmemory_policy.
* The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU.
Expand Down Expand Up @@ -1210,3 +1210,4 @@ int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
return 0;
}

#endif
5 changes: 4 additions & 1 deletion src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
auto last_slot_it = bucket_it;
last_slot_it += (PrimeTable::kBucketWidth - 1);
if (!last_slot_it.is_done()) {
if (last_slot_it->second.HasExpire()) {
ExpireTable* expire_tbl = db_slice_->GetTables(db_indx_).second;
CHECK_EQ(1u, expire_tbl->Erase(last_slot_it->first));
}
UpdateStatsOnDeletion(last_slot_it, db_slice_->MutableStats(db_indx_));
}
CHECK(me->ShiftRight(bucket_it));
Expand Down Expand Up @@ -446,7 +450,6 @@ bool DbSlice::UpdateExpire(DbIndex db_ind, PrimeIterator it, uint64_t at) {

if (!it->second.HasExpire() && at) {
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.

CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
it->second.SetExpire(true);

Expand Down
4 changes: 4 additions & 0 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ class DbSlice {
return db_arr_;
}

void TEST_EnableCacheMode() {
caching_mode_ = 1;
}

private:
void CreateDb(DbIndex index);

Expand Down
26 changes: 24 additions & 2 deletions src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern "C" {
#include "redis/zmalloc.h"
}

#include <absl/flags/reflection.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_join.h>
#include <absl/strings/strip.h>
Expand Down Expand Up @@ -444,7 +445,7 @@ TEST_F(DflyEngineTest, OOM) {
max_memory_limit = 0;
size_t i = 0;
RespExpr resp;
for (; i < 10000; i += 3) {
for (; i < 5000; i += 3) {
resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2),
"bar"});
if (resp != "OK")
Expand All @@ -464,7 +465,7 @@ TEST_F(DflyEngineTest, OOM) {
}
run_args.push_back("bar");

for (unsigned i = 0; i < 10000; ++i) {
for (unsigned i = 0; i < 5000; ++i) {
run_args[1] = StrCat("key", cmd, i);
resp = Run(run_args);

Expand All @@ -477,6 +478,27 @@ TEST_F(DflyEngineTest, OOM) {
}
}

/// Reproduces the case where items with expiry data were evicted,
/// and then written with the same key.
TEST_F(DflyEngineTest, Bug207) {
shard_set->TEST_EnableHeartBeat();
shard_set->TEST_EnableCacheMode();

max_memory_limit = 0;

ssize_t i = 0;
RespExpr resp;
for (; i < 5000; ++i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
// we evict some items because 5000 is too much when max_memory_limit is zero.
ASSERT_EQ(resp, "OK");
}

for (; i > 0; --i) {
resp = Run({"setex", StrCat("key", i), "30", "bar"});
}
}

TEST_F(DflyEngineTest, PSubscribe) {
single_response_ = false;
auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); });
Expand Down
4 changes: 4 additions & 0 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,4 +381,8 @@ void EngineShardSet::TEST_EnableHeartBeat() {
RunBriefInParallel([](EngineShard* shard) { shard->TEST_EnableHeartbeat(); });
}

void EngineShardSet::TEST_EnableCacheMode() {
RunBriefInParallel([](EngineShard* shard) { shard->db_slice().TEST_EnableCacheMode(); });
}

} // namespace dfly
1 change: 1 addition & 0 deletions src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ class EngineShardSet {

// Used in tests
void TEST_EnableHeartBeat();
void TEST_EnableCacheMode();

private:
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
Expand Down
1 change: 1 addition & 0 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
builder->SendSetSkipped();
}

/// (P)SETEX key seconds value
void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view ex = ArgS(args, 2);
Expand Down
5 changes: 4 additions & 1 deletion src/server/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ namespace dfly {

#define ADD(x) (x) += o.x

// It should be const, but we override this variable in our tests so that they run faster.
unsigned kInitSegmentLog = 3;

DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
constexpr size_t kDbSz = sizeof(DbTableStats);
static_assert(kDbSz == 56);
Expand All @@ -26,7 +29,7 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
}

DbTable::DbTable(std::pmr::memory_resource* mr)
: prime(2, detail::PrimeTablePolicy{}, mr),
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr) {
}
Expand Down
3 changes: 3 additions & 0 deletions src/server/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
void Release(IntentLock::Mode mode, std::string_view key, unsigned count);
};

// We use reference counting semantics of DbTable when doing snapshotting.
// There we need to preserve the copy of the table in case someone flushes it during
// the snapshot process. We copy the pointers in StartSnapshotInShard function.
using DbTableArray = std::vector<boost::intrusive_ptr<DbTable>>;

} // namespace dfly
5 changes: 5 additions & 0 deletions src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ using namespace std;
ABSL_DECLARE_FLAG(string, dbfilename);

namespace dfly {

extern unsigned kInitSegmentLog;

using MP = MemcacheParser;
using namespace util;
using namespace testing;
Expand Down Expand Up @@ -113,6 +116,8 @@ BaseFamilyTest::~BaseFamilyTest() {
}

void BaseFamilyTest::SetUpTestSuite() {
kInitSegmentLog = 1;

absl::SetFlag(&FLAGS_dbfilename, "");
init_zmalloc_threadlocal(mi_heap_get_backing());
}
Expand Down

0 comments on commit 05eb323

Please sign in to comment.