Skip to content

Fix: Flux.zip does not emit errors from concurrent sources #4041

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

georgebanasios
Copy link
Contributor

Fixes #3917

A race condition existed where a concurrent onComplete signal from one source and an onError signal from another could result in the onError signal being dropped.

Solution:

  1. Synchronized Inner Subscriber State: The ZipCoordinator.error() method now takes full ownership of the termination sequence. It guarantees that the central error field is set before the inner subscriber's done flag is marked. This closes a timing window where the drain loop could previously see a done subscriber and mistakenly assume a completion.

  2. Atomic Terminal State Tie-Breaker: Enhanced the drain() loop logic to use Exceptions.TERMINATED as an atomic sentinel value. This acts as a definitive tie-breaker for racing terminal signals. We ensure that only the first signal (either an error or a completion) can claim the terminal state, with any subsequent signals being correctly handled or dropped.

@georgebanasios georgebanasios requested a review from a team as a code owner June 22, 2025 08:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FluxZip fails to deliver error from concurrently running sources
1 participant