Skip to content

Commit 040083f

Browse files
committed
Improvements to pauseable channels
Added options late-subscription and buffer-already-requested
1 parent e7bf4f5 commit 040083f

File tree

10 files changed

+380
-39
lines changed

10 files changed

+380
-39
lines changed

api/revapi.json

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,28 @@
2727
"criticality" : "highlight",
2828
"minSeverity" : "POTENTIALLY_BREAKING",
2929
"minCriticality" : "documented",
30-
"differences" : [ ]
30+
"differences" : [
31+
{
32+
"code": "java.method.addedToInterface",
33+
"new": "method boolean io.smallrye.reactive.messaging.PausableChannel::clearBuffer()",
34+
"justification": "Added clear buffer method to pausable channels"
35+
},
36+
{
37+
"code": "java.method.addedToInterface",
38+
"new": "method int io.smallrye.reactive.messaging.PausableChannel::bufferSize()",
39+
"justification": "Added buffer size method to pausable channels"
40+
},
41+
{
42+
"code": "java.method.addedToInterface",
43+
"new": "method boolean io.smallrye.reactive.messaging.PausableChannelConfiguration::lateSubscription()",
44+
"justification": "Added late-subscription configuration to pausable channels"
45+
},
46+
{
47+
"code": "java.method.addedToInterface",
48+
"new": "method boolean io.smallrye.reactive.messaging.PausableChannelConfiguration::bufferAlreadyRequested()",
49+
"justification": "Added buffer-already-requested configuration to pausable channels"
50+
}
51+
]
3152
}
3253
}, {
3354
"extension" : "revapi.reporter.json",
@@ -46,4 +67,4 @@
4667
"minCriticality" : "documented",
4768
"output" : "out"
4869
}
49-
} ]
70+
} ]

api/src/main/java/io/smallrye/reactive/messaging/PausableChannel.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,18 @@ public interface PausableChannel {
2121
* Resumes the channel.
2222
*/
2323
void resume();
24+
25+
/**
26+
* Returns items buffered in the channel.
27+
*
28+
* @return a copy of the buffered items
29+
*/
30+
int bufferSize();
31+
32+
/**
33+
* Clears buffered items.
34+
*
35+
* @return {@code true} if the items were cleared, {@code false} otherwise
36+
*/
37+
boolean clearBuffer();
2438
}

api/src/main/java/io/smallrye/reactive/messaging/PausableChannelConfiguration.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,17 @@ public interface PausableChannelConfiguration {
1515
*/
1616
String PAUSED_PROPERTY = "initially-paused";
1717

18+
/**
19+
* The name of the property to configure whether the subscription is postponed to resume if the channel is paused at
20+
* subscribe time.
21+
*/
22+
String LATE_SUBSCRIPTION_PROPERTY = "late-subscription";
23+
24+
/**
25+
* The name of the property to configure whether to buffer is already requested items when the channel is paused.
26+
*/
27+
String BUFFER_ALREADY_REQUESTED_PROPERTY = "buffer-already-requested";
28+
1829
/**
1930
* The name of the channel.
2031
*/
@@ -25,4 +36,13 @@ public interface PausableChannelConfiguration {
2536
*/
2637
boolean initiallyPaused();
2738

39+
/**
40+
* Whether the subscription is done after the channel is paused.
41+
*/
42+
boolean lateSubscription();
43+
44+
/**
45+
* Whether to buffer is already requested items when the channel is paused.
46+
*/
47+
boolean bufferAlreadyRequested();
2848
}

documentation/src/main/docs/concepts/pausable-channels.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@ To use pausable channels, you need to activate it with the configuration propert
1818
mp.messaging.incoming.my-channel.pausable=true
1919
# optional, by default the channel is NOT paused initially
2020
mp.messaging.outgoing.my-channel.initially-paused=true
21+
# optional, when enabled subscription to the upstream will be delayed until resume is called
22+
mp.messaging.outgoing.my-channel.late-subscription=true
2123
```
2224

2325
## Controlling the flow of messages
2426

2527
If a channel is configured to be pausable,
26-
you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically,
28+
you can either inject it using the `@Channel("channel-name")` identifier, or retrieve it by channel name from the `ChannelRegistry` programmatically,
2729
and pause or resume the channel as needed:
2830

2931
``` java
@@ -32,3 +34,21 @@ and pause or resume the channel as needed:
3234

3335
!!!warning
3436
Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests.
37+
38+
## Working with concurrent consumers
39+
40+
Pausable channels work by blocking upstream requests when paused and letting them through when resumed.
41+
42+
This means, due to the asynchronous nature of reactive streams,
43+
calling pause on a channel may not stop already requested messages from being dispatched to the consumer.
44+
This is especially true when using concurrent consumers with `@Blocking(ordered = false)`.
45+
46+
If the upstream publisher (usually the consumer from an inbound source) produces already requested messages to a paused channel, pausable channel buffers those messages.
47+
When the channel is resumed, the buffered messages are dispatched to the consumer.
48+
49+
This behavior is enabled by default and can be disabled by setting the configuration property `buffer-already-requested` to `false`:
50+
51+
```properties
52+
mp.messaging.incoming.my-channel.pausable=true
53+
mp.messaging.incoming.my-channel.buffer-already-requested=false
54+
```

documentation/src/main/java/pausable/PausableController.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import jakarta.enterprise.context.ApplicationScoped;
55
import jakarta.inject.Inject;
66

7+
import org.eclipse.microprofile.reactive.messaging.Channel;
78
import org.eclipse.microprofile.reactive.messaging.Incoming;
89

910
import io.smallrye.reactive.messaging.ChannelRegistry;
@@ -12,21 +13,26 @@
1213
@ApplicationScoped
1314
public class PausableController {
1415

16+
@Inject
17+
@Channel("my-channel")
18+
PausableChannel pausable;
19+
1520
@Inject
1621
ChannelRegistry registry;
1722

23+
public PausableChannel getPausable() {
24+
// Retrieve the pausable channel from channel registry
25+
return registry.getPausable("my-channel");
26+
}
27+
1828
@PostConstruct
1929
public void resume() {
2030
// Wait for the application to be ready
21-
// Retrieve the pausable channel
22-
PausableChannel pausable = registry.getPausable("my-channel");
2331
// Pause the processing of the messages
2432
pausable.resume();
2533
}
2634

2735
public void pause() {
28-
// Retrieve the pausable channel
29-
PausableChannel pausable = registry.getPausable("my-channel");
3036
// Pause the processing of the messages
3137
pausable.pause();
3238
}

0 commit comments

Comments
 (0)