Skip to content

Commit ae857c0

Browse files
romangeclaude
andcommitted
fix(tiering): implement qlist symmetric cooloff and fix memory handling
Implements the symmetric cooloff logic for QList tiering that walks from both ends of the list and offloads nodes beyond the threshold. Fixes memory handling bugs in move operator and null pointer check. Changes: - Implement symmetric cooloff in CoolOff() for balanced tiering - Fix InsertNode() null pointer check when insert_opt is AFTER - Fix move operator to properly reset num_offloaded_nodes_ - Add Stats::operator+= for cross-shard aggregation - Add Prometheus metrics for list reads and tiering events - Add list_tiering_threshold flag for configuration - Fix typo: "overlaoding" -> "overloading" Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 55d6491 commit ae857c0

File tree

5 files changed

+69
-7
lines changed

5 files changed

+69
-7
lines changed

src/core/qlist.cc

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,24 @@ QList::Node* SplitNode(QList::Node* node, int offset, bool after, ssize_t* diff)
417417

418418
__thread QList::Stats QList::stats;
419419

420+
QList::Stats& QList::Stats::operator+=(const Stats& other) {
421+
#define ADD_FIELD(field) this->field += other.field;
422+
423+
ADD_FIELD(compression_attempts);
424+
ADD_FIELD(bad_compression_attempts);
425+
ADD_FIELD(decompression_calls);
426+
ADD_FIELD(compressed_bytes);
427+
ADD_FIELD(raw_compressed_bytes);
428+
ADD_FIELD(interior_node_reads);
429+
ADD_FIELD(total_node_reads);
430+
ADD_FIELD(offload_requests);
431+
ADD_FIELD(onload_requests);
432+
433+
#undef ADD_FIELD
434+
435+
return *this;
436+
}
437+
420438
size_t QList::Node::GetLZF(void** data) const {
421439
DCHECK(encoding == QUICKLIST_NODE_ENCODING_LZF || encoding == QLIST_NODE_ENCODING_LZ4);
422440
quicklistLZF* lzf = (quicklistLZF*)entry;
@@ -485,9 +503,9 @@ QList& QList::operator=(QList&& other) noexcept {
485503
compress_ = other.compress_;
486504
bookmark_count_ = other.bookmark_count_;
487505
tiering_params_ = std::move(other.tiering_params_);
488-
506+
num_offloaded_nodes_ = other.num_offloaded_nodes_;
489507
other.head_ = nullptr;
490-
other.len_ = other.count_ = 0;
508+
other.len_ = other.count_ = other.num_offloaded_nodes_ = 0;
491509
}
492510
return *this;
493511
}
@@ -701,7 +719,7 @@ void QList::InsertNode(Node* old_node, Node* new_node, uint32_t old_node_id, Ins
701719

702720
// Calculate final positions AFTER all linkage and len_ updates are complete.
703721
uint32_t new_node_id;
704-
if (insert_opt == AFTER) {
722+
if (insert_opt == AFTER && old_node) {
705723
new_node_id = old_node_id + 1; // new_node inserted after, old_node position unchanged
706724
} else {
707725
new_node_id = old_node_id; // new_node takes old_node's position
@@ -892,8 +910,28 @@ void QList::CoolOff(Node* node, uint32_t node_id) {
892910
stats.offload_requests++;
893911
node->offloaded = 1;
894912
}
895-
} else if (len_ > num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2) {
896-
// TBD.
913+
} else if (num_offloaded_nodes_ * 2 + tiering_params_->node_depth_threshold * 2 < len_) {
914+
auto* fw = head_;
915+
auto* rev = head_->prev;
916+
node_id = 0;
917+
while (node_id <= len_ / 2 &&
918+
(num_offloaded_nodes_ + 2 * tiering_params_->node_depth_threshold) < len_) {
919+
if (node_id >= tiering_params_->node_depth_threshold) {
920+
if (fw->offloaded == 0) {
921+
num_offloaded_nodes_++;
922+
stats.offload_requests++;
923+
fw->offloaded = 1;
924+
}
925+
if (rev->offloaded == 0) {
926+
num_offloaded_nodes_++;
927+
stats.offload_requests++;
928+
rev->offloaded = 1;
929+
}
930+
}
931+
fw = fw->next;
932+
rev = rev->prev;
933+
node_id++;
934+
}
897935
}
898936
}
899937

src/core/qlist.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,8 @@ class QList {
248248
uint64_t total_node_reads = 0;
249249
uint64_t offload_requests = 0;
250250
uint64_t onload_requests = 0;
251+
252+
Stats& operator+=(const Stats& other);
251253
};
252254
static __thread Stats stats;
253255

src/server/list_family.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ ABSL_FLAG(int32_t, list_max_listpack_size, -2, "Maximum listpack size, default i
6161
*/
6262

6363
ABSL_FLAG(int32_t, list_compress_depth, 0, "Compress depth of the list. Default is no compression");
64+
ABSL_FLAG(unsigned, list_tiering_threshold, 0,
65+
"Tiering threshold for lists. Default - no tiering.");
6466

6567
namespace dfly {
6668

@@ -99,6 +101,10 @@ class ListWrapper {
99101
if (ShouldPromoteToQL(lp.GetPointer(), value.size())) {
100102
QList* ql = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
101103
GetFlag(FLAGS_list_compress_depth));
104+
if (GetFlag(FLAGS_list_tiering_threshold) > 0) {
105+
ql->SetTieringParams(
106+
QList::TieringParams{.node_depth_threshold = GetFlag(FLAGS_list_tiering_threshold)});
107+
}
102108
if (lp.Size() > 0) {
103109
ql->AppendListpack(lp.GetPointer());
104110
} else {

src/server/server_family.cc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1963,6 +1963,12 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
19631963
AppendMetricWithoutLabels("huffman_tables_built", "Huffman tables built",
19641964
m.shard_stats.huffman_tables_built, MetricType::COUNTER, &resp->body());
19651965

1966+
AppendMetricHeader("list_reads", "List Reads Patterns", MetricType::COUNTER, &resp->body());
1967+
AppendMetricValue("list_reads", m.qlist_stats.total_node_reads, {"type"}, {"total"},
1968+
&resp->body());
1969+
AppendMetricValue("list_reads", m.qlist_stats.interior_node_reads, {"type"}, {"interior"},
1970+
&resp->body());
1971+
19661972
// Tiered metrics
19671973
{
19681974
AppendMetricWithoutLabels("tiered_entries", "Tiered entries", total.tiered_entries,
@@ -1998,12 +2004,19 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
19982004
AppendMetricValue("tiered_hits", m.events.ram_misses, {"type"}, {"disk"}, &resp->body());
19992005

20002006
// Potential problems due to overloading system
2001-
AppendMetricHeader("tiered_overload", "Potential problems due to overlaoding",
2007+
AppendMetricHeader("tiered_overload", "Potential problems due to overloading",
20022008
MetricType::COUNTER, &resp->body());
20032009
AppendMetricValue("tiered_overload", m.tiered_stats.total_clients_throttled, {"type"},
20042010
{"client throttling"}, &resp->body());
20052011
AppendMetricValue("tiered_overload", m.tiered_stats.total_stash_overflows, {"type"},
20062012
{"stash overflows"}, &resp->body());
2013+
2014+
AppendMetricHeader("tiered_list_events", "Tiered List Events", MetricType::COUNTER,
2015+
&resp->body());
2016+
AppendMetricValue("tiered_list_events", m.qlist_stats.offload_requests, {"type"}, {"offload"},
2017+
&resp->body());
2018+
AppendMetricValue("tiered_list_events", m.qlist_stats.onload_requests, {"type"}, {"onload"},
2019+
&resp->body());
20072020
}
20082021
}
20092022

@@ -2837,6 +2850,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
28372850
result.search_stats += shard->search_indices()->GetStats();
28382851
}
28392852

2853+
result.qlist_stats += QList::stats;
2854+
28402855
result.traverse_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_TRAVERSE);
28412856
result.delete_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_DELETE);
28422857
if (result.tx_queue_len < shard->txq()->size())

src/server/server_family.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <optional>
1010
#include <string>
1111

12+
#include "core/qlist.h"
1213
#include "facade/dragonfly_listener.h"
1314
#include "server/detail/save_stages_controller.h"
1415
#include "server/dflycmd.h"
@@ -22,7 +23,6 @@
2223
namespace util {
2324

2425
class AcceptServer;
25-
class ListenerInterface;
2626
class HttpListenerBase;
2727

2828
} // namespace util
@@ -86,6 +86,7 @@ struct Metrics {
8686
SearchStats search_stats;
8787
ServerState::Stats coordinator_stats; // stats on transaction running
8888
PeakStats peak_stats;
89+
QList::Stats qlist_stats;
8990

9091
size_t qps = 0;
9192

0 commit comments

Comments
 (0)