Skip to content

Conversation

biochimia
Copy link
Contributor

This is a combination of two related changes to the handling of pending commits during rebalance operations in KafkaConsumerActor.

Single definition of pending commit effect

Whether immediately committing offsets, or queueing the request in State.pendingCommits, the necessary effect is built at once.

This avoids the need to repeat similar logic in disparate call sites, and makes visible in a single place how the request will be processed and logged.

Ensure pending commits order

Commits would be added to the chain of pending commits based only on the rebalancing state. Given that the rebalancing state is updated (via ConsumerRebalanceListener.onPartitionsAssigned) separate from when pending commits are processed (after poll), it could happen that commits emitted later would be processed before earlier pending ones.

This updates the condition for queueing commits to take into account the prior existence of pending commits.

In addition, the condition for processing pending commits in poll is also updated to disregard whether a rebalance operation was ongoing at the start of the poll. Instead, the existence of pending commits along with a non-rebalancing state are a sufficient trigger.

This ensures that rebalance operations that might conclude within a single consumer poll do not leave behind any pending commits.

At the moment, these possibilities are theoretical as commit operations are serialized via KafkaConsumerActor's request queue, and don't happen concurrently to polls. That said, the cost of the fixes is trivial and being explicit about the conditions may prevent future bugs, if the surrounding context changes.

@biochimia biochimia force-pushed the tweak-pending-commits branch from 067d60e to ad6b554 Compare January 3, 2025 10:01
@aartigao aartigao force-pushed the tweak-pending-commits branch from ad6b554 to 7d25cd9 Compare January 14, 2025 13:57
Copy link
Contributor

@aartigao aartigao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PR has conflicts from another PR of yours. Can you fix them, please? 🙏🏽


override def message: String =
s"Committed pending commits [$pendingCommits]. Current state [$state]."
override def message: String = s"Committed pending commit [$pendingCommit]."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you exclude the logging of the state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In part, the reason for dropping logging of the state here is that the updated state is not available at the point the commit effect is prepared in the KafkaConsumerActor.commit.

Additionally, with this change, and the way the commits are processed, the log message is now made per pending commit, and not just once for the full chain. (There will be one CommittedPendingCommit log entry per StoredPendingCommit entry.) At this granularity, it might be noisy to repeat the updated state.

Now, none of this actually requires that the updated state be dropped from the log message, even if it was convenient to do so.

Going over where pending commits are handled, in KafkaConsumerActor.handlePoll, the updated state should be logged prior to the handling of pending commits, so we shouldn't lose detail in the logs. Considering this versus the added complexity to kep the log messages as before, it seemed like a good trade-off to drop the detail.

@biochimia biochimia force-pushed the tweak-pending-commits branch from 7d25cd9 to 1a58572 Compare March 10, 2025 10:55
@biochimia
Copy link
Contributor Author

I have rebased my changes on top of the updated branch.

@aartigao
Copy link
Contributor

More conflicts after merging your changes.

Commits would be added to the chain of pending commits based only on the
rebalancing state. Given that the rebalancing state is updated (via
`ConsumerRebalanceListener.onPartitionsAssigned`) separate from when
pending commits are processed (after `poll`), it could happen that
commits emitted later would be processed before earlier pending ones.

This updates the condition for queueing commits to take into account the
prior existence of pending commits.

At the moment, this possibility is theoretical as commit operations are
serialized via `KafkaConsumerActor`'s request queue, and doesn't happen
concurrently to polls. That said, the cost of the fix is trivial and
being explicit about the conditions may prevent future bugs, if the
surrounding context changes.
@biochimia biochimia force-pushed the tweak-pending-commits branch from 1a58572 to 2fe4c60 Compare March 15, 2025 08:59
@aartigao aartigao merged commit ed7716b into fd4s:series/3.x Mar 15, 2025
8 checks passed
@biochimia
Copy link
Contributor Author

Thank you for taking the time to review and merge this.

The other PR included and obsoleted some of these changes, so this one became really small.

I developed all the patches together, and split them in the end, as I expected the FetchRequest changes to warrant a deeper dive and wanted to keep noise to the minimum on that one. Conflicts were an unavoidable consequence.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

2 participants