Skip to content

Commit 9c98765

Browse files
authored
Merge pull request #2219 from ozangunalp/blocking_improvements
Blocking improvements max-concurrency
2 parents b99cd65 + d02a5a5 commit 9c98765

File tree

6 files changed

+47
-12
lines changed

6 files changed

+47
-12
lines changed

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/AbstractMediator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ public abstract class AbstractMediator {
4444
protected HealthCenter health;
4545
private Instance<MessageConverter> converters;
4646
private Instance<KeyValueExtractor> extractors;
47+
private int maxConcurrency;
4748

4849
public AbstractMediator(MediatorConfiguration configuration) {
4950
this.configuration = configuration;
@@ -99,6 +100,10 @@ public void setWorkerPoolRegistry(WorkerPoolRegistry workerPoolRegistry) {
99100
this.workerPoolRegistry = workerPoolRegistry;
100101
}
101102

103+
public void setMaxConcurrency(int maxConcurrency) {
104+
this.maxConcurrency = maxConcurrency;
105+
}
106+
102107
public void run() {
103108
// Do nothing by default.
104109
}
@@ -259,8 +264,11 @@ public Instance<KeyValueExtractor> extractors() {
259264
return extractors;
260265
}
261266

267+
public int maxConcurrency() {
268+
return maxConcurrency;
269+
}
270+
262271
public void terminate() {
263272
// Do nothing by default.
264273
}
265-
266274
}

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/ProcessorMediator.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,10 +323,11 @@ private void processMethodReturningIndividualMessageAndConsumingIndividualItem()
323323
this.mapper = upstream -> {
324324
Multi<? extends Message<?>> multi = MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration);
325325
return multi
326-
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
326+
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
327327
.onItemOrFailure()
328328
.transformToUni((o, t) -> this.handlePostInvocationWithMessage((Message<?>) o, t))
329-
.onItem().transformToMulti(this::handleSkip));
329+
.onItem().transformToMulti(this::handleSkip))
330+
.merge(maxConcurrency());
330331
};
331332
}
332333

@@ -358,9 +359,10 @@ private void processMethodReturningIndividualPayloadAndConsumingIndividualItem()
358359
.onItem().transformToMulti(this::handleSkip));
359360
} else {
360361
this.mapper = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
361-
.onItem().transformToMultiAndMerge(message -> invokeBlocking(message, getArguments(message))
362+
.onItem().transformToMulti(message -> invokeBlocking(message, getArguments(message))
362363
.onItemOrFailure().transformToUni((r, f) -> handlePostInvocation(message, r, f))
363-
.onItem().transformToMulti(this::handleSkip));
364+
.onItem().transformToMulti(this::handleSkip))
365+
.merge(maxConcurrency());
364366
}
365367

366368
} else {

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/PublisherMediator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ private void produceIndividualMessages() {
143143
.onItem().transform(o -> (Message<?>) o));
144144
} else {
145145
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
146-
.onItem().transformToUniAndMerge(u -> u)
146+
.onItem().transformToUni(u -> u).merge(maxConcurrency())
147147
.onItem().transform(o -> (Message<?>) o));
148148
}
149149
} else {
@@ -163,7 +163,7 @@ private void produceIndividualPayloads() {
163163
.onItem().transform(Message::of));
164164
} else {
165165
this.publisher = decorate(MultiUtils.createFromGenerator(this::invokeBlocking)
166-
.onItem().transformToUniAndMerge(u -> u)
166+
.onItem().transformToUni(u -> u).merge(maxConcurrency())
167167
.onItem().transform(Message::of));
168168
}
169169
} else {

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/SubscriberMediator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ private void processMethodReturningVoid() {
165165
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
166166
} else {
167167
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
168-
.onItem().transformToUniAndMerge(msg -> invokeBlocking(msg, getArguments(msg))
168+
.onItem().transformToUni(msg -> invokeBlocking(msg, getArguments(msg))
169169
.onItemOrFailure().transformToUni(handleInvocationResult(msg)))
170+
.merge(maxConcurrency())
170171
.onFailure()
171172
.invoke(failure -> health.reportApplicationFailure(configuration.methodAsString(), failure));
172173
}
@@ -210,7 +211,7 @@ private void processMethodReturningACompletionStage() {
210211
.onFailure().invoke(this::reportFailure);
211212
} else {
212213
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
213-
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
214+
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency())
214215
.onFailure().invoke(this::reportFailure);
215216
}
216217
} else {
@@ -243,7 +244,7 @@ private void processMethodReturningAUni() {
243244
.onFailure().invoke(this::reportFailure);
244245
} else {
245246
this.function = upstream -> MultiUtils.handlePreProcessingAcknowledgement(upstream, configuration)
246-
.onItem().transformToUniAndMerge(this::invokeBlockingAndHandleOutcome)
247+
.onItem().transformToUni(this::invokeBlockingAndHandleOutcome).merge(maxConcurrency())
247248
.onFailure().invoke(this::reportFailure);
248249
}
249250
} else {

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/connectors/WorkerPoolRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535

3636
@ApplicationScoped
3737
public class WorkerPoolRegistry {
38-
private static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
39-
private static final String WORKER_CONCURRENCY = "max-concurrency";
38+
public static final String WORKER_CONFIG_PREFIX = "smallrye.messaging.worker";
39+
public static final String WORKER_CONCURRENCY = "max-concurrency";
4040

4141
@Inject
4242
Instance<ExecutionHolder> executionHolder;

smallrye-reactive-messaging-provider/src/main/java/io/smallrye/reactive/messaging/providers/extension/MediatorManager.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.smallrye.reactive.messaging.providers.extension;
22

3+
import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONCURRENCY;
4+
import static io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry.WORKER_CONFIG_PREFIX;
35
import static io.smallrye.reactive.messaging.providers.i18n.ProviderLogging.log;
46

57
import java.lang.reflect.Constructor;
@@ -11,10 +13,12 @@
1113
import jakarta.enterprise.inject.spi.*;
1214
import jakarta.inject.Inject;
1315

16+
import org.eclipse.microprofile.config.Config;
1417
import org.eclipse.microprofile.config.inject.ConfigProperty;
1518
import org.eclipse.microprofile.reactive.messaging.Incoming;
1619
import org.eclipse.microprofile.reactive.messaging.Outgoing;
1720

21+
import io.smallrye.mutiny.helpers.queues.Queues;
1822
import io.smallrye.reactive.messaging.*;
1923
import io.smallrye.reactive.messaging.EmitterConfiguration;
2024
import io.smallrye.reactive.messaging.PublisherDecorator;
@@ -79,6 +83,9 @@ public class MediatorManager {
7983
@ConfigProperty(name = STRICT_MODE_PROPERTY, defaultValue = "false")
8084
boolean strictMode;
8185

86+
@Inject
87+
Instance<Config> configInstance;
88+
8289
public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
8390

8491
if (strictMode) {
@@ -97,6 +104,22 @@ public <T> void analyze(AnnotatedType<T> annotatedType, Bean<T> bean) {
97104
});
98105
}
99106

107+
private int getWorkerMaxConcurrency(MediatorConfiguration configuration) {
108+
// max concurrency is not relevant if not blocking
109+
if (!configuration.isBlocking()) {
110+
return -1;
111+
}
112+
String poolName = configuration.getWorkerPoolName();
113+
// if the poll name is null we are on the default worker pool, set the default concurrent requests
114+
if (poolName == null) {
115+
return Queues.BUFFER_S;
116+
}
117+
String concurrencyConfigKey = WORKER_CONFIG_PREFIX + "." + poolName + "." + WORKER_CONCURRENCY;
118+
Optional<Integer> concurrency = configInstance.get().getOptionalValue(concurrencyConfigKey, Integer.class);
119+
// Fallback to the default concurrent requests if setting is not found
120+
return concurrency.orElse(Queues.BUFFER_S);
121+
}
122+
100123
/**
101124
* This method is used in the Quarkus extension.
102125
*
@@ -177,6 +200,7 @@ public AbstractMediator createMediator(MediatorConfiguration configuration) {
177200
mediator.setExtractors(extractors);
178201
mediator.setHealth(health);
179202
mediator.setWorkerPoolRegistry(workerPoolRegistry);
203+
mediator.setMaxConcurrency(getWorkerMaxConcurrency(configuration));
180204

181205
try {
182206
Object beanInstance = beanManager.getReference(configuration.getBean(), Object.class,

0 commit comments

Comments
 (0)