fix: Auto-cancelled Sinks.Many would continue to accept emissions after cancellation #4037
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #3715
Ensure auto-cancelled Sinks.Many reject emissions.
1.Fixed the WIP Leak in
drain()
The core issue was that the
drain()
method had several premature return statements inside its main loop that were triggered on termination. These exits bypassed the logic that decrements thewip
counter, effectively deadlocking the sink and preventing queue cleanup.Change: All early return statements inside the
for(; ; )
loop indrain()
have been removed.Effect: This guarantees the drain loop always completes its lifecycle, the
wip
counter is managed correctly, and the sink's serialization is always preserved.2.Adjusted
tryEmitNext()
Against RacesChange:
tryEmitNext()
now performs a finalisCancelled()
check after offering an item to the queue and triggering a drain.Effect: If a cancellation occurs concurrently with an emission,
drain()
cleans up the offered item, andtryEmitNext()
correctly returnsFAIL_CANCELLED
. This prevents both item leaks and misleadingOK
results.3.Corrected Auto-Cancellation Logic in
remove()
Change: When the last subscriber is removed, the
remove()
method now simply sets the cancelled state and delegates all cleanup work to thedrain()
method.Effect: This centralizes all state-change side effects (like clearing the queue) into the main serialized drain loop.