Skip to content

Commit 5ce47d3

Browse files
romangeclaude
andcommitted
fix(tiering): implement qlist symmetric cooloff and fix memory handling
Implements the symmetric cooloff logic for QList tiering that walks from both ends of the list and offloads nodes beyond the threshold. Fixes memory handling bugs in move operator and null pointer check. Changes: - Implement symmetric cooloff in CoolOff() for balanced tiering - Fix InsertNode() null pointer check when insert_opt is AFTER - Fix move operator to properly reset num_offloaded_nodes_ - Add Stats::operator+= for cross-shard aggregation - Add Prometheus metrics for list reads and tiering events - Add list_tiering_threshold flag for configuration - Fix typo: "overlaoding" -> "overloading" Co-Authored-By: Claude Sonnet 4.5 <[email protected]> Signed-off-by: Roman Gershman <[email protected]>
1 parent 55d6491 commit 5ce47d3

File tree

6 files changed

+97
-26
lines changed

6 files changed

+97
-26
lines changed

src/core/detail/listpack.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ class ListPack {
7676
return lp_;
7777
}
7878

79+
size_t BytesSize() const {
80+
return lpBytes(lp_);
81+
}
82+
7983
private:
8084
static CollectionEntry GetEntry(uint8_t* pos);
8185

src/core/qlist.cc

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,24 @@ QList::Node* SplitNode(QList::Node* node, int offset, bool after, ssize_t* diff)
417417

418418
__thread QList::Stats QList::stats;
419419

420+
QList::Stats& QList::Stats::operator+=(const Stats& other) {
421+
#define ADD_FIELD(field) this->field += other.field;
422+
423+
ADD_FIELD(compression_attempts);
424+
ADD_FIELD(bad_compression_attempts);
425+
ADD_FIELD(decompression_calls);
426+
ADD_FIELD(compressed_bytes);
427+
ADD_FIELD(raw_compressed_bytes);
428+
ADD_FIELD(interior_node_reads);
429+
ADD_FIELD(total_node_reads);
430+
ADD_FIELD(offload_requests);
431+
ADD_FIELD(onload_requests);
432+
433+
#undef ADD_FIELD
434+
435+
return *this;
436+
}
437+
420438
size_t QList::Node::GetLZF(void** data) const {
421439
DCHECK(encoding == QUICKLIST_NODE_ENCODING_LZF || encoding == QLIST_NODE_ENCODING_LZ4);
422440
quicklistLZF* lzf = (quicklistLZF*)entry;
@@ -485,9 +503,9 @@ QList& QList::operator=(QList&& other) noexcept {
485503
compress_ = other.compress_;
486504
bookmark_count_ = other.bookmark_count_;
487505
tiering_params_ = std::move(other.tiering_params_);
488-
506+
num_offloaded_nodes_ = other.num_offloaded_nodes_;
489507
other.head_ = nullptr;
490-
other.len_ = other.count_ = 0;
508+
other.len_ = other.count_ = other.num_offloaded_nodes_ = 0;
491509
}
492510
return *this;
493511
}
@@ -701,7 +719,7 @@ void QList::InsertNode(Node* old_node, Node* new_node, uint32_t old_node_id, Ins
701719

702720
// Calculate final positions AFTER all linkage and len_ updates are complete.
703721
uint32_t new_node_id;
704-
if (insert_opt == AFTER) {
722+
if (insert_opt == AFTER && old_node) {
705723
new_node_id = old_node_id + 1; // new_node inserted after, old_node position unchanged
706724
} else {
707725
new_node_id = old_node_id; // new_node takes old_node's position
@@ -892,8 +910,34 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
892910
stats.offload_requests++;
893911
node->offloaded = 1;
894912
}
895-
} else if (len_ > num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2) {
896-
// TBD.
913+
} else if (num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2 < len_) {
914+
// We check `num_offloaded_nodes_ * 2` above to avoid frequent traversals.
915+
// So only when the gap between offloaded and non-offloaded nodes is large enough,
916+
// we do a traversal to offload more nodes.
917+
auto* fw = head_;
918+
auto* rev = head_->prev;
919+
uint32_t traverse_node_id = 0;
920+
921+
// Traverse from both ends towards the middle as we expect more offloads towards the ends
922+
// due to usuall access patterns of adding items via lpush/rpush.
923+
while (traverse_node_id <= len_ / 2 &&
924+
(num_offloaded_nodes_ + 2 * tiering_params_->node_depth_threshold) < len_) {
925+
if (traverse_node_id >= tiering_params_->node_depth_threshold) {
926+
if (fw->offloaded == 0) {
927+
num_offloaded_nodes_++;
928+
stats.offload_requests++;
929+
fw->offloaded = 1;
930+
}
931+
if (rev->offloaded == 0) {
932+
num_offloaded_nodes_++;
933+
stats.offload_requests++;
934+
rev->offloaded = 1;
935+
}
936+
}
937+
fw = fw->next;
938+
rev = rev->prev;
939+
traverse_node_id++;
940+
}
897941
}
898942
}
899943

src/core/qlist.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ class QList {
248248
uint64_t total_node_reads = 0;
249249
uint64_t offload_requests = 0;
250250
uint64_t onload_requests = 0;
251+
252+
Stats& operator+=(const Stats& other);
251253
};
252254
static __thread Stats stats;
253255

src/server/list_family.cc

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ ABSL_FLAG(int32_t, list_max_listpack_size, -2, "Maximum listpack size, default i
6161
*/
6262

6363
ABSL_FLAG(int32_t, list_compress_depth, 0, "Compress depth of the list. Default is no compression");
64+
ABSL_FLAG(unsigned, list_tiering_threshold, 0,
65+
"Tiering threshold for lists. Default - no tiering.");
6466

6567
namespace dfly {
6668

@@ -86,22 +88,30 @@ class ListWrapper {
8688
return std::visit(Overload{[&f](auto* s) { return f(*s); }, f}, impl_);
8789
}
8890

89-
static bool ShouldPromoteToQL(uint8_t* lp, size_t new_size) {
90-
size_t sz = lpBytes(lp);
91-
return !ShouldStoreAsListPack(sz + new_size);
91+
static QList* PromoteToQLIfNeeded(LP lp, size_t additional_size) {
92+
size_t sz = lp.BytesSize();
93+
if (ShouldStoreAsListPack(sz + additional_size)) {
94+
return nullptr;
95+
}
96+
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
97+
GetFlag(FLAGS_list_compress_depth));
98+
if (GetFlag(FLAGS_list_tiering_threshold) > 0) {
99+
ql->SetTieringParams(
100+
QList::TieringParams{.node_depth_threshold = GetFlag(FLAGS_list_tiering_threshold)});
101+
}
102+
if (lp.Size() > 0) {
103+
ql->AppendListpack(lp.GetPointer());
104+
}
105+
return ql;
92106
}
93107

94108
void PushInternal(string_view value, QList::Where where, QList& ql) {
95109
ql.Push(value, where);
96110
}
97111

98112
void PushInternal(string_view value, QList::Where where, LP& lp) {
99-
if (ShouldPromoteToQL(lp.GetPointer(), value.size())) {
100-
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
101-
GetFlag(FLAGS_list_compress_depth));
102-
if (lp.Size() > 0) {
103-
ql->AppendListpack(lp.GetPointer());
104-
} else {
113+
if (QList* ql = PromoteToQLIfNeeded(lp, value.size()); ql) {
114+
if (lp.Size() == 0) { // otherwise we already appended it in PromoteToQLIfNeeded.
105115
lpFree(lp.GetPointer());
106116
}
107117
ql->Push(value, where);
@@ -120,14 +130,12 @@ class ListWrapper {
120130
if (!p)
121131
return false;
122132

123-
if (ShouldPromoteToQL(lp.GetPointer(), elem.size())) {
124-
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
125-
GetFlag(FLAGS_list_compress_depth));
126-
DCHECK_GT(lp.Size(), 0u); // otherwise we would not find anything
133+
if (QList* ql = PromoteToQLIfNeeded(lp, elem.size()); ql) {
134+
DCHECK_GT(ql->Size(), 0u); // otherwise we would not Find the pivot.
127135
impl_ = ql;
128-
ql->AppendListpack(lp.GetPointer());
129136
return ql->Insert(pivot, elem, insert_opt);
130137
}
138+
131139
lp.Insert(p, elem, insert_opt);
132140
return true;
133141
}
@@ -141,12 +149,9 @@ class ListWrapper {
141149
if (!p)
142150
return false;
143151

144-
if (ShouldPromoteToQL(lp.GetPointer(), elem.size())) {
145-
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
146-
GetFlag(FLAGS_list_compress_depth));
147-
DCHECK_GT(lp.Size(), 0u); // otherwise we would not seek
152+
if (QList* ql = PromoteToQLIfNeeded(lp, elem.size()); ql) {
153+
DCHECK_GT(ql->Size(), 0u); // otherwise we would not seek
148154
impl_ = ql;
149-
ql->AppendListpack(lp.GetPointer());
150155
return ql->Replace(index, elem);
151156
}
152157
lp.Replace(p, elem);

src/server/server_family.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1963,6 +1963,12 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
19631963
AppendMetricWithoutLabels("huffman_tables_built", "Huffman tables built",
19641964
m.shard_stats.huffman_tables_built, MetricType::COUNTER, &resp->body());
19651965

1966+
AppendMetricHeader("list_reads", "List Reads Patterns", MetricType::COUNTER, &resp->body());
1967+
AppendMetricValue("list_reads", m.qlist_stats.total_node_reads, {"type"}, {"total"},
1968+
&resp->body());
1969+
AppendMetricValue("list_reads", m.qlist_stats.interior_node_reads, {"type"}, {"interior"},
1970+
&resp->body());
1971+
19661972
// Tiered metrics
19671973
{
19681974
AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,
@@ -1998,12 +2004,19 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
19982004
AppendMetricValue("tiered_hits", m.events.ram_misses, {"type"}, {"disk"}, &resp->body());
19992005

20002006
// Potential problems due to overloading system
2001-
AppendMetricHeader("tiered_overload", "Potential problems due to overlaoding",
2007+
AppendMetricHeader("tiered_overload", "Potential problems due to overloading",
20022008
MetricType::COUNTER, &resp->body());
20032009
AppendMetricValue("tiered_overload", m.tiered_stats.total_clients_throttled, {"type"},
20042010
{"client throttling"}, &resp->body());
20052011
AppendMetricValue("tiered_overload", m.tiered_stats.total_stash_overflows, {"type"},
20062012
{"stash overflows"}, &resp->body());
2013+
2014+
AppendMetricHeader("tiered_list_events", "Tiered List Events", MetricType::COUNTER,
2015+
&resp->body());
2016+
AppendMetricValue("tiered_list_events", m.qlist_stats.offload_requests, {"type"}, {"offload"},
2017+
&resp->body());
2018+
AppendMetricValue("tiered_list_events", m.qlist_stats.onload_requests, {"type"}, {"onload"},
2019+
&resp->body());
20072020
}
20082021
}
20092022

@@ -2837,6 +2850,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
28372850
result.search_stats += shard->search_indices()->GetStats();
28382851
}
28392852

2853+
result.qlist_stats += QList::stats;
2854+
28402855
result.traverse_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_TRAVERSE);
28412856
result.delete_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_DELETE);
28422857
if (result.tx_queue_len < shard->txq()->size())

src/server/server_family.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <optional>
1010
#include <string>
1111

12+
#include "core/qlist.h"
1213
#include "facade/dragonfly_listener.h"
1314
#include "server/detail/save_stages_controller.h"
1415
#include "server/dflycmd.h"
@@ -22,7 +23,6 @@
2223
namespace util {
2324

2425
class AcceptServer;
25-
class ListenerInterface;
2626
class HttpListenerBase;
2727

2828
} // namespace util
@@ -86,6 +86,7 @@ struct Metrics {
8686
SearchStats search_stats;
8787
ServerState::Stats coordinator_stats; // stats on transaction running
8888
PeakStats peak_stats;
89+
QList::Stats qlist_stats;
8990

9091
size_t qps = 0;
9192

0 commit comments

Comments
 (0)