-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix: xreadgroup is a write command #6492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Kostas Kyrimis <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Adjusts Dragonfly’s handling of XREADGROUP to behave as a write/journaled command and adds tests to validate correct replication/journaling behavior (including ENTRIESREAD propagation).
Changes:
- Marks
XREADGROUPasCO::JOURNALED | CO::NO_AUTOJOURNALand adds manual journaling logic forXREADGROUPside effects. - Adds support for parsing/setting
XGROUP SETID ... ENTRIESREAD <n>to keep group counters consistent across replicas. - Adds replication and monitor-based tests covering
XREADGROUPjournaling/replication (NOACK vs PEL, blocking vs non-blocking).
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| tests/dragonfly/replication_test.py | Adds monitor expectations for XREADGROUP journaling and a new replication test ensuring group state matches between master/replica. |
| src/server/stream_family.cc | Implements manual journaling for XREADGROUP, tracks delivery time, and propagates ENTRIESREAD via XGROUP SETID. |
Comments suppressed due to low confidence (1)
src/server/stream_family.cc:2554
- In XReadBlock(), the
keyvariable is no longer assigned when a wake key is found (the previouskey = *wake_key;was removed). The reply path usesSendStreamRecords(key, *result), so blocking XREAD/XREADGROUP responses will be sent with an empty/incorrect stream key. Restore settingkey(e.g., to*wake_key) alongside computingresult.
range_opts.noack = opts->noack;
result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts);
if (result) {
JournalXReadGroupIfNeeded(t->GetOpArgs(shard), *opts, *result, *wake_key);
}
}
return OpStatus::OK;
};
tx->Execute(std::move(range_cb), true);
if (result) {
SinkReplyBuilder::ReplyAggregator agg(rb);
if (opts->read_group && rb->IsResp3()) {
rb->StartCollection(1, CollectionType::MAP);
} else {
rb->StartArray(1);
rb->StartArray(2);
}
return StreamReplies{rb}.SendStreamRecords(key, *result);
} else if (result.status() == OpStatus::INVALID_VALUE) {
🤖 Augment PR SummarySummary: This PR treats Changes:
Technical Notes: The replication strategy emulates Redis/Valkey behavior by translating 🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| OpResult<RecordVec> range_res; | ||
|
|
||
| // TODO server history IS PER stream. Fix this incorrect behaviour |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Signed-off-by: Kostas Kyrimis <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/server/stream_family.cc:806
OpRangeno longer short-circuits whenopts.count == 0.ParseReadArgsOrReplyallowsCOUNT 0, so this path can now scan and return entries instead of returning an empty result, changing command semantics and potentially causing unexpected work.
Restore the count == 0 early-return (and ensure it also avoids any XREADGROUP side-effects like PEL/NACK creation).
RecordVec result;
streamIterator si;
int64_t numfields;
streamID id;
stream* s = (stream*)cobj->RObjPtr();
streamID sstart = opts.start.val, send = opts.end.val;
streamIteratorStart(&si, s, &sstart, &send, opts.is_rev);
while (streamIteratorGetID(&si, &id, &numfields)) {
Signed-off-by: Kostas Kyrimis <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
Signed-off-by: Kostas Kyrimis <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
src/server/stream_family.cc:934
- In
OpRangeFromConsumerPEL, the code updatesnack->delivery_time/delivery_count, but the returnedRecorddoesn’t carry the updated delivery metadata (Record::delivery_timeremains 0, and there’s no delivery-count field). If XREADGROUP history reads need to be journaled (or otherwise observed outside the shard), this loses the information required to reproduce PEL delivery metadata on replicas. Consider propagating delivery time/count into the returned record (or journaling directly from thenack).
Record rec;
rec.id = id;
result.push_back(rec);
} else {
streamNACK* nack = static_cast<streamNACK*>(ri.data);
nack->delivery_time = op_args.db_cntx.time_now_ms;
nack->delivery_count++;
result.push_back(std::move(op_result.value()[0]));
| if (!opts.serve_history) { | ||
| // Reading NEW messages (ID = ">") | ||
| auto journal_xgroup = [&opts, op_args](const auto& records, std::string_view key) { | ||
| if (!records.empty()) { | ||
| const auto& sitem = opts.stream_ids.at(key); | ||
| auto id = absl::StrCat(records.back().id.ms, "-", records.back().id.seq); | ||
| auto entries_read = absl::StrCat(sitem.group->entries_read); | ||
| CmdArgVec journal_args = {"SETID", key, opts.group_name, id, "ENTRIESREAD", entries_read}; | ||
| RecordJournal(op_args, "XGROUP"sv, journal_args); | ||
| } | ||
| }; | ||
| for (auto& record : records) { | ||
| if (!opts.noack) { | ||
| auto id = absl::StrCat(record.id.ms, "-", record.id.seq); | ||
| auto deliv_time = absl::StrCat(record.delivery_time); | ||
| CmdArgVec journal_args = {key, opts.group_name, opts.consumer_name, "0", | ||
| id, "TIME", deliv_time, "RETRYCOUNT", | ||
| "1", "FORCE", "JUSTID", "LASTID", | ||
| id}; | ||
|
|
||
| RecordJournal(op_args, "XCLAIM"sv, journal_args); | ||
| } | ||
| } | ||
| journal_xgroup(records, key); | ||
| } |
Copilot
AI
Jan 29, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JournalXReadGroupIfNeeded skips journaling entirely when opts.serve_history is true, but OpRangeFromConsumerPEL still mutates PEL metadata (delivery_time/delivery_count). That means XREADGROUP reads from the PEL (non->) will not replicate these side effects, and replicas can diverge from the master’s pending-entry delivery metadata. Consider journaling an XCLAIM (with correct TIME/RETRYCOUNT) for history reads as well, while still only emitting XGROUP SETID ... ENTRIESREAD when reading new messages (>).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting! I thought we did not propagate the consumer but I was wrong. Also there are some small corner cases! Will adress the shortly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so this is what I was talking about consumers. Valkey and Redis handles this slightly differently. Either way I will follow up on a different PR - it's getting too large and the case is not very interesting
romange
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. there are some small comments that you still can fix.
also - seems it can benefit from unit tests as well, not only the replication.
Can you plz clarify the following:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| if (is_write_command) { | ||
| group = streamLookupCG(s, WrapSds(opts->group_name)); | ||
| if (!group) | ||
| return facade::ErrorReply{ |
Copilot
AI
Jan 30, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the XREADGROUP path, FindOrAddConsumer() (a few lines below) can return nullptr on OOM. That pointer is stored into requested_sitem.consumer and later OpRange dereferences opts.consumer when NOACK is not set, which can crash. Add an explicit null check after FindOrAddConsumer() and return an OUT_OF_MEMORY error reply instead of proceeding.
romange
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You changed the semantics of the commands. Is it possible to capture these changes via unit tests? i.e. if you create a consumer - check that a consumer is created. if you updated the metadata - check that the metadata is updated.
Yes this is done implicitly by the replication test + the XINFO command which checks if the side effect propagates. I did not change or affect consumer creation at all. The main change for master, is that we properly call I am happy to add anything -- just trying not to be redundant with tests at least |
Summary: This PR treats
XREADGROUPas a write/journaled command and adds explicit journaling to replicate its side effects.Changes:
delivery_timeto support journaling PEL delivery metadata.OpRange/OpReadpaths to capture delivery time when creating PEL entries for consumer groups.JournalXReadGroupIfNeededto journalXCLAIM(when notNOACK) andXGROUP SETIDwithENTRIESREADwhen reading new messages (ID>).XGROUP SETIDimplementation to optionally updateentries_read, and adds command parsing forENTRIESREAD.XREADGROUPcommand registration flags to beJOURNALEDwithNO_AUTOJOURNAL(manual journaling).XINFO GROUPSstate across NOACK/PEL and blocking/non-blocking paths.Notes: The replication strategy emulates Redis/Valkey behavior by translating
XREADGROUPeffects into explicit journal entries (XCLAIM+XGROUP SETID) to keep consumer-group state consistent on replicas.resolves #6493