Skip to content

Commit b7af0fb

Browse files
committed
Inject pausable channels
1 parent 232c160 commit b7af0fb

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/ChannelProducer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.smallrye.reactive.messaging.ChannelRegistry;
3030
import io.smallrye.reactive.messaging.MessageConverter;
3131
import io.smallrye.reactive.messaging.MutinyEmitter;
32+
import io.smallrye.reactive.messaging.PausableChannel;
3233
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
3334
import io.smallrye.reactive.messaging.providers.helpers.TypeUtils;
3435
import io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions;
@@ -187,6 +188,17 @@ <T> MutinyEmitter<T> produceMutinyEmitter(InjectionPoint injectionPoint) {
187188
return getEmitter(injectionPoint);
188189
}
189190

191+
@Produces
192+
@Channel("")
193+
PausableChannel producePausableChannel(InjectionPoint injectionPoint) {
194+
String name = getChannelName(injectionPoint);
195+
PausableChannel channel = channelRegistry.getPausable(name);
196+
if (channel == null) {
197+
throw ex.pausableChannelNotFound(name);
198+
}
199+
return channel;
200+
}
201+
190202
/**
191203
* Injects an {@link io.smallrye.reactive.messaging.annotations.Emitter} (deprecated) matching the channel name.
192204
*

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/i18n/ProviderExceptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,4 +303,7 @@ IllegalArgumentException illegalArgumentForWorkerConfigKey(String annotation, St
303303
@Message(id = 1008, value = "Emitter configuration for channel `%s` in %s is different than a previous configuration : %s")
304304
DefinitionException differentEmitterConfigurationPerInjection(String channel, String injectionPoint, String config);
305305

306+
@Message(id = 1009, value = "Unable to find a pausable channel with name `%s`")
307+
DefinitionException pausableChannelNotFound(String name);
308+
306309
}

0 commit comments

Comments
 (0)