Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions api/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,28 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"code": "java.method.addedToInterface",
"new": "method boolean io.smallrye.reactive.messaging.PausableChannel::clearBuffer()",
"justification": "Added clear buffer method to pausable channels"
},
{
"code": "java.method.addedToInterface",
"new": "method int io.smallrye.reactive.messaging.PausableChannel::bufferSize()",
"justification": "Added buffer size method to pausable channels"
},
{
"code": "java.method.addedToInterface",
"new": "method boolean io.smallrye.reactive.messaging.PausableChannelConfiguration::lateSubscription()",
"justification": "Added late-subscription configuration to pausable channels"
},
{
"code": "java.method.addedToInterface",
"new": "method boolean io.smallrye.reactive.messaging.PausableChannelConfiguration::bufferAlreadyRequested()",
"justification": "Added buffer-already-requested configuration to pausable channels"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand All @@ -46,4 +67,4 @@
"minCriticality" : "documented",
"output" : "out"
}
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,18 @@ public interface PausableChannel {
* Resumes the channel.
*/
void resume();

/**
* Returns items buffered in the channel.
*
* @return a copy of the buffered items
*/
int bufferSize();

/**
* Clears buffered items.
*
* @return {@code true} if the items were cleared, {@code false} otherwise
*/
boolean clearBuffer();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ public interface PausableChannelConfiguration {
*/
String PAUSED_PROPERTY = "initially-paused";

/**
* The name of the property to configure whether the subscription is postponed to resume if the channel is paused at
* subscribe time.
*/
String LATE_SUBSCRIPTION_PROPERTY = "late-subscription";

/**
* The name of the property to configure whether to buffer is already requested items when the channel is paused.
*/
String BUFFER_ALREADY_REQUESTED_PROPERTY = "buffer-already-requested";

/**
* The name of the channel.
*/
Expand All @@ -25,4 +36,13 @@ public interface PausableChannelConfiguration {
*/
boolean initiallyPaused();

/**
* Whether the subscription is done after the channel is paused.
*/
boolean lateSubscription();

/**
* Whether to buffer is already requested items when the channel is paused.
*/
boolean bufferAlreadyRequested();
}
22 changes: 21 additions & 1 deletion documentation/src/main/docs/concepts/pausable-channels.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ To use pausable channels, you need to activate it with the configuration propert
mp.messaging.incoming.my-channel.pausable=true
# optional, by default the channel is NOT paused initially
mp.messaging.outgoing.my-channel.initially-paused=true
# optional, when enabled subscription to the upstream will be delayed until resume is called
mp.messaging.outgoing.my-channel.late-subscription=true
```

## Controlling the flow of messages

If a channel is configured to be pausable,
you can get the `PausableChannel` by channel name from the `ChannelRegistry` programmatically,
you can either inject it using the `@Channel("channel-name")` identifier, or retrieve it by channel name from the `ChannelRegistry` programmatically,
and pause or resume the channel as needed:

``` java
Expand All @@ -32,3 +34,21 @@ and pause or resume the channel as needed:

!!!warning
Pausable channels only work with back-pressure aware subscribers, with bounded downstream requests.

## Working with concurrent consumers

Pausable channels work by blocking upstream requests when paused and letting them through when resumed.

This means, due to the asynchronous nature of reactive streams,
calling pause on a channel may not stop already requested messages from being dispatched to the consumer.
This is especially true when using concurrent consumers with `@Blocking(ordered = false)`.

If the upstream publisher (usually the consumer from an inbound source) produces already requested messages to a paused channel, pausable channel buffers those messages.
When the channel is resumed, the buffered messages are dispatched to the consumer.

This behavior is enabled by default and can be disabled by setting the configuration property `buffer-already-requested` to `false`:

```properties
mp.messaging.incoming.my-channel.pausable=true
mp.messaging.incoming.my-channel.buffer-already-requested=false
```
14 changes: 10 additions & 4 deletions documentation/src/main/java/pausable/PausableController.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.ChannelRegistry;
Expand All @@ -12,21 +13,26 @@
@ApplicationScoped
public class PausableController {

@Inject
@Channel("my-channel")
PausableChannel pausable;

@Inject
ChannelRegistry registry;

public PausableChannel getPausable() {
// Retrieve the pausable channel from channel registry
return registry.getPausable("my-channel");
}

@PostConstruct
public void resume() {
// Wait for the application to be ready
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.resume();
}

public void pause() {
// Retrieve the pausable channel
PausableChannel pausable = registry.getPausable("my-channel");
// Pause the processing of the messages
pausable.pause();
}
Expand Down
Loading