-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Streams that are shared using the Flux.publish()
(or Flux.replay(0)
) operators are buffering and
then replaying to new subscribers elements that are emitted while there are no connected subscribers
on the stream. When there is at least one active subscriber, new subscribers receive only newly emitted
elements which is the expected behaviour. But as soon as the subscriber count drops to zero, the stream
begins to buffer elements (perhaps up to the default small queue size?) which are then replayed when a
new subscriber connects.
Expected Behavior
When a stream is shared via Flux.publish()
or Flux.replay(0)
I would expect new subscribers not to
receive elements that were emitted prior to the subscription being created. Furthermore, I would not
expect the behaviour of such a subscription to change depending on how many other subscribers are connected
to the stream.
Actual Behavior
Subcriptions on streams shared via Flux.publish()
appear to behave differently depending on whether
there are other active subscribers to the stream: specifically, if there are no active subscribers for a
period of time, some (or all) of the elements emitted during that period are incorrectly replayed to a new
subscriber when it does subscribe.
Steps to Reproduce
build.gradle.kts
:
plugins {
id("java")
}
group = "reactor-issue"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
testImplementation("org.junit.jupiter:junit-jupiter:5.10.0")
testImplementation("io.projectreactor:reactor-test:3.7.1")
}
tasks.test {
useJUnitPlatform()
}
@Test
void incorrectlyPublishesMessagesReceivedWhenThereAreNoSubscribers() {
// this test fails - i would expect it to pass
TestPublisher<String> sink = TestPublisher.create();
sink.next("hello");
sink.next("hello");
sink.next("hello");
sink.next("hello");
// i am using .autoConnect() here but the problem is the same if making a manual call to .connect()
Flux<String> shared = sink.flux().log().publish().autoConnect();
StepVerifier.create(shared.log())
.then(() -> sink.next("hello 2"))
.expectNext("hello 2")
.thenCancel()
.verify();
sink.next("hello 3");
sink.next("hello 4");
sink.next("hello 5");
StepVerifier.create(shared.log())
.then(() -> sink.next("hello 6"))
// failure here: we get "hello 3" as first element
.expectNext("hello 6")
.thenCancel()
.verify();
}
@Test
void correctlyPublishesOnlyNewMessagesWhenThereAreOtherActiveSubscribers() {
// this test passes (as there is another active subscriber)
TestPublisher<String> sink = TestPublisher.create();
sink.next("hello");
sink.next("hello");
sink.next("hello");
sink.next("hello");
Flux<String> shared = sink.flux().log().publish().autoConnect();
// create an active subscription
Disposable existingSub = shared.subscribe();
StepVerifier.create(shared.log())
.then(() -> sink.next("hello 2"))
.expectNext("hello 2")
.thenCancel()
.verify();
// the other active subscription consumes these
sink.next("hello 3");
sink.next("hello 4");
sink.next("hello 5");
StepVerifier.create(shared.log())
.then(() -> sink.next("hello 6"))
.expectNext("hello 6")
.thenCancel()
.verify();
existingSub.dispose();
}
Possible Solution
I am not sufficiently close to the implementation of the FluxPublish
operator or other Reactor
internals to suggest a fix.
However, there are a couple of workarounds:
-
Connect a 'dummy' subscriber to the hot stream after sharing it (as in the second test case in the
above example code), which results in more predictable and consistent behaviour for other subscribers -
Use
.refCount()
instead of.autoConnect()
- this has different behaviour as it cancels
the upstream subscription once the subscriber count drops to zero, but will avoid the problem. The
downside is that it will require a new subscription to the upstream as soon as a new subscriber
connects; for upstream subscriptions that are costly to initiate, this is a drawback
Your Environment
This issue occurs with reactor-core version 3.7.1 and also the latest point release of 3.6.x.
I am running with JDK/JRE 21 (OpenJDK), on a Mac running latest OS version.