Skip to content

Commit 0a71915

Browse files
committed
chore: fix pr comments 5
Signed-off-by: Gil Levkovich <[email protected]>
1 parent 988e2fb commit 0a71915

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

src/facade/dragonfly_connection.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,6 +1113,12 @@ void Connection::ConnectionFlow() {
11131113

11141114
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
11151115
absl::FunctionRef<void()> enqueue_cmd_cb) {
1116+
// Unconditional return when closing:
1117+
// else, non-throttled connections skip the check below and enqueue data even if they are closing.
1118+
// No one will read that data anyway.
1119+
if (cc_->conn_closing)
1120+
return;
1121+
11161122
bool optimize_for_async = has_more;
11171123
QueueBackpressure& qbp = GetQueueBackpressure();
11181124
size_t global_pipeline_bytes = qbp.pipeline_bytes.load(std::memory_order_relaxed);
@@ -1150,9 +1156,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
11501156

11511157
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
11521158
if (optimize_for_async || !can_dispatch_sync) {
1153-
if (!cc_->conn_closing) {
1154-
LaunchAsyncFiberIfNeeded();
1155-
}
1159+
LaunchAsyncFiberIfNeeded();
11561160
enqueue_cmd_cb();
11571161
} else {
11581162
ShrinkPipelinePool(); // Gradually release pipeline request pool.

src/server/server_family.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1558,7 +1558,8 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
15581558
&resp->body());
15591559
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
15601560
MetricType::GAUGE, &resp->body());
1561-
AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries,
1561+
AppendMetricWithoutLabels("pipeline_queue_length", "",
1562+
conn_stats.dispatch_queue_entries + conn_stats.pipeline_queue_entries,
15621563
MetricType::GAUGE, &resp->body());
15631564
AppendMetricWithoutLabels("send_delay_seconds", "",
15641565
double(GetDelayMs(m.oldest_pending_send_ts)) / 1000.0,
@@ -1770,6 +1771,8 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
17701771
MetricType::GAUGE, &resp->body());
17711772
AppendMetricWithoutLabels("dispatch_queue_bytes", "", conn_stats.dispatch_queue_bytes,
17721773
MetricType::GAUGE, &resp->body());
1774+
AppendMetricWithoutLabels("pipeline_queue_bytes", "", conn_stats.pipeline_queue_bytes,
1775+
MetricType::GAUGE, &resp->body());
17731776
AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes,
17741777
MetricType::GAUGE, &resp->body());
17751778
}

0 commit comments

Comments
 (0)