Skip to content

Conversation

@glevkovich
Copy link
Contributor

@glevkovich glevkovich commented Jan 27, 2026

Separates Redis pipeline processing from the SendAsync control path to
improve architecture and allow distinct monitoring.

  • Fixes pipeline_buffer_limit enforcement by introducing
    QueueBackpressure::pipeline_bytes to track global per-thread memory
    usage, resolving a bug where limits were applied per-connection.
  • Adds a safety barrier in ClearPipelinedMessages to wait for in-flight
    async commands during teardown, preventing use-after-free races.
  • Fixes potential underflow assertions in ReleaseParsedCommand.
  • Optimizes fiber notification logic based on protocol specific needs.

@glevkovich glevkovich marked this pull request as ready for review January 27, 2026 05:13
Copilot AI review requested due to automatic review settings January 27, 2026 05:13
@glevkovich glevkovich self-assigned this Jan 27, 2026
@augmentcode
Copy link

augmentcode bot commented Jan 27, 2026

🤖 Augment PR Summary

Summary: Refactors RESP pipeline dispatching by separating pipelined commands from control/admin events to simplify async processing and migration ordering.

Changes:

  • Removed pipelined Redis commands from `dispatch_q_` and introduced a dedicated parsed-command queue (linked list via `ParsedCommand::next`).
  • Added `HasPendingMessages()` / `GetPendingMessageCount()` helpers and updated migration/close logic to use them.
  • Reworked `DispatchSingle` and parsing to enqueue pipeline commands and optionally yield to the async consumer.
  • Split async processing into `ProcessAdminMessage` and `ProcessPipelineCommand`, and updated `AsyncFiber` to drain admin vs pipeline paths explicitly.
  • Adjusted backpressure, flushing behavior, and memory accounting (including adding queued pipeline bytes into `GetMemoryUsage()`).

Technical Notes: Migration requests are intentionally deferred behind outstanding RESP pipeline work to avoid priority inversion and to keep reply flushing deterministic.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 2 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors Dragonfly connection dispatching by separating Redis pipelined command handling from the admin/control dispatch queue, aiming to improve queuing policy and latency accounting.

Changes:

  • Move Redis pipelined commands off dispatch_q_ into a dedicated parsed-command linked list and update async fiber draining logic accordingly.
  • Update pipeline backpressure/latency accounting to use per-pipeline counters (pending_pipeline_*) and parsed_cycle as the enqueue timestamp.
  • Adjust connection stats semantics/documentation to reflect protocol-specific counting (Redis admin-only vs Memcache all).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
src/facade/parsed_command.h Clarifies that parsed_cycle is also used as enqueue timestamp for pipeline wait latency.
src/facade/facade_types.h Updates/clarifies dispatch queue stats semantics and improves comment formatting.
src/facade/dragonfly_connection.h Updates message variant, dispatch API signatures, and adds helpers for pending-queue tracking.
src/facade/dragonfly_connection.cc Main refactor: splits control vs pipeline paths, changes async fiber processing, and rewires stats/backpressure and cleanup logic.

@glevkovich glevkovich force-pushed the glevkovich/refactor_redis_dispatch_queuing_policy3 branch from 3eabc83 to 31fa1c2 Compare January 27, 2026 06:17
Copilot AI review requested due to automatic review settings January 27, 2026 09:01
@glevkovich glevkovich force-pushed the glevkovich/refactor_redis_dispatch_queuing_policy3 branch from 31fa1c2 to 9373fb0 Compare January 27, 2026 09:01
@glevkovich glevkovich changed the title Glevkovich/refactor redis dispatch queuing policy3 refactor(facade): separate redis data/control paths (phase 2, final) Jan 27, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

Copilot AI review requested due to automatic review settings January 27, 2026 09:41
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

@glevkovich glevkovich force-pushed the glevkovich/refactor_redis_dispatch_queuing_policy3 branch from 96155a8 to fa75cdc Compare January 27, 2026 11:31
Copilot AI review requested due to automatic review settings January 27, 2026 15:14
@glevkovich glevkovich force-pushed the glevkovich/refactor_redis_dispatch_queuing_policy3 branch from fa75cdc to 988e2fb Compare January 27, 2026 15:14
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.

Separates Redis pipeline processing from the SendAsync control path to
improve architecture and allow distinct monitoring.

* Fixes pipeline_buffer_limit enforcement by introducing
QueueBackpressure::pipeline_bytes to track global per-thread memory
usage, resolving a bug where limits were applied per-connection.
* Adds a safety barrier in ClearPipelinedMessages to wait for in-flight
async commands during teardown, preventing use-after-free races.
* Fixes potential underflow assertions in ReleaseParsedCommand.
* Optimizes fiber notification logic based on protocol specific needs.

Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Copilot AI review requested due to automatic review settings January 29, 2026 12:09
@glevkovich glevkovich force-pushed the glevkovich/refactor_redis_dispatch_queuing_policy3 branch from 0a71915 to 5de84b4 Compare January 29, 2026 12:09
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Signed-off-by: Gil Levkovich <[email protected]>
Signed-off-by: Gil Levkovich <[email protected]>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.

Signed-off-by: Gil Levkovich <[email protected]>
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries,
AppendMetricWithoutLabels("pipeline_queue_length", "",
conn_stats.dispatch_queue_entries + conn_stats.pipeline_queue_entries,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you preserve the original behavior, but here lets actually measure pipeline_queue_entries

append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries);

// Sum of dispatch and pipeline queues.
append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly - pipeline only


// Processes a single Admin/Control message from dispatch_q_.
// Returns true if the fiber should terminate (e.g. Migration).
bool ProcessAdminMessage(MessageHandle msg, AsyncOperations& async_op);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls pass mutable arguments by pointer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admin is a little misleading now, because it also handles monitor/pubsub - which is "technically" not admin


// Processes a single Admin/Control message from dispatch_q_.
// Returns true if the fiber should terminate (e.g. Migration).
bool ProcessAdminMessage(MessageHandle msg, AsyncOperations& async_op);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Admin is a little misleading now, because it also handles monitor/pubsub - which is "technically" not admin

Comment on lines +1734 to +1738
if (ProcessAdminMessage(std::move(msg), async_op)) {
return; // don't set conn closing flag
}
} else { // Process Pipeline Queue
ProcessPipelineCommand();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So non-commands have always higher priority? I'm trying to think of possible problems... For example, a pubsub client that always runs with messages pending (at limit), but never overfills it, so it stays active for a long time. Imagine it wants to unsubscribe from a channel - it sends an "UNSUBSCRIBE" command. It won't be processed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote it 100 times, we should verify that we don't break anything

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +1893 to +1896
if (GetQueueBackpressure().IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes + stats_->pipeline_queue_bytes,
GetPendingMessageCount())) {
cc_->conn_closing = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You use it always just with pipeline_queue_bytes. Do we ignore dispatch_queue_bytes now? I assume it should be the same formula everywhere

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants