Skip to content

Fix: Sinks queue is not cleared on immediate cancellation #4056

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

georgebanasios
Copy link
Contributor

Resolves #3359

This PR resolves an issue where elements buffered in a Sinks.many().unicast().onBackpressureBuffer() or Sinks.many().multicast().onBackpressureBuffer() sink were not discarded if the downstream consumer cancelled immediately, most notably through operators like .take(0).

Root cause:
The .take(0) operator creates a FluxLimitRequest instance, which is an OptimizableOperator.

During the subscription phase, the subscribeOrReturn is getting called on this operator before propagating the subscription to the upstream source (SinkManyUnicast).

Because the take limit is 0, FluxLimitRequest immediately completes the downstream subscriber and returns null. This null return signals that the subscription chain should be terminated.

As a result, subscribe() is never called on the SinkManyUnicast. The sink is left unaware that a consumer ever attempted to connect and its cancellation and cleanup logic, which would clear the buffer, is never triggered.

Solution:
The affected Sinks implement the SourceProducer interface and override a method that provides an opt-in side-channel for operators to signal a source to release its resources, bypassing the standard subscription path.

@georgebanasios georgebanasios requested a review from a team as a code owner July 15, 2025 17:01
@georgebanasios
Copy link
Contributor Author

Hi @chemicL

While this PR fixes the specific mentioned issue, I noticed one more thing that causes the buffers to not clear resources upon immediate cancellation.
I could've added this fix in this PR as well, but I wanted your opinion first because this potentially could be a breaking change.

The same issue with .take(0) exists in operators that throw in their constructor such as take(n) and elementAt(n), n<0.
When n < 0 both operators throw synchronous exceptions and this results to the same issue that leaves buffered sinks 'orphans' and potentially could leak memory.
A simple test to demonstrate:

@Test
void elementAtWithNegativeIndexShouldClearQueue() {
    BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    Sinks.Many<Integer> sink = Sinks.many().unicast().onBackpressureBuffer(queue);

    sink.tryEmitNext(1);
    sink.tryEmitNext(2);
    sink.tryEmitNext(3);
    assertThat(queue).hasSize(3);

    assertThrows(IndexOutOfBoundsException.class, () -> {
       sink.asFlux()
             .elementAt(-1)
             .onErrorComplete()
             .block();
    });

    assertThat(queue).isEmpty(); // This fails
}

What I was thinking is:

  1. Move the validation from the constructors into subscribeOrReturn to allow for resource cleanup. If we add the validation logic on constructors I think that contradicts with Nothing happens until you subscribe maybe.
  2. This would be a breaking change to the error-handling of these operators, as they would not throw an exception during creation but instead during subscription time.

Let me know what you think!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

EmitterProcessor queue not cleared on immediate cancellation
1 participant