Skip to content

Commit ee6f01f

Browse files
authored
Merge pull request #545 from smallrye/features/fix-mqtt-connection-pool
MQTT Client and Connection sharing
2 parents 0151cec + 6cf0f21 commit ee6f01f

File tree

11 files changed

+404
-42
lines changed

11 files changed

+404
-42
lines changed

documentation/src/main/doc/modules/mqtt/pages/inbound.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,6 @@ include::connectors:partial$META-INF/connector/smallrye-mqtt-incoming.adoc[]
5353

5454
The MQTT connector is based on the https://vertx.io/docs/vertx-mqtt/java/#_vert_x_mqtt_client[Vert.x MQTT client].
5555
So you can pass any attribute supported by this client.
56+
57+
IMPORTANT: A single instance of `MqttClient` and a single connection is used for each `host` / `port` / `server-name` / `client-id`.
58+
This client is reused for both the inbound and outbound connectors.

documentation/src/main/doc/modules/mqtt/pages/outbound.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,3 +57,6 @@ include::connectors:partial$META-INF/connector/smallrye-mqtt-outgoing.adoc[]
5757

5858
The MQTT connector is based on the https://vertx.io/docs/vertx-mqtt/java/#_vert_x_mqtt_client[Vert.x MQTT client].
5959
So you can pass any attribute supported by this client.
60+
61+
IMPORTANT: A single instance of `MqttClient` and a single connection is used for each `host` / `port` / `server-name` / `client-id`.
62+
This client is reused for both the inbound and outbound connectors.
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.smallrye.reactive.messaging.mqtt;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
6+
import io.smallrye.mutiny.Multi;
7+
import io.smallrye.mutiny.Uni;
8+
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
9+
import io.vertx.mqtt.MqttClientOptions;
10+
import io.vertx.mutiny.core.Vertx;
11+
import io.vertx.mutiny.mqtt.MqttClient;
12+
import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
13+
import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
14+
15+
public class Clients {
16+
17+
private static Map<String, ClientHolder> clients = new ConcurrentHashMap<>();
18+
19+
private Clients() {
20+
// avoid direct instantiation.
21+
}
22+
23+
static Uni<MqttClient> getConnectedClient(Vertx vertx, String host, int port, String server,
24+
MqttClientOptions options) {
25+
26+
String id = host + port + "<" + (server == null ? "" : server)
27+
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
28+
ClientHolder holder = clients.computeIfAbsent(id, key -> {
29+
MqttClient client = MqttClient.create(vertx, options);
30+
return new ClientHolder(client, host, port, server);
31+
});
32+
return holder.connect();
33+
}
34+
35+
static ClientHolder getHolder(Vertx vertx, String host, int port, String server,
36+
MqttClientOptions options) {
37+
38+
String id = host + port + "<" + (server == null ? "" : server)
39+
+ ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
40+
return clients.computeIfAbsent(id, key -> {
41+
MqttClient client = MqttClient.create(vertx, options);
42+
return new ClientHolder(client, host, port, server);
43+
});
44+
}
45+
46+
/**
47+
* Removed all the stored clients.
48+
*/
49+
public static void clear() {
50+
clients.clear();
51+
}
52+
53+
public static class ClientHolder {
54+
55+
private final MqttClient client;
56+
private final Uni<MqttConnAckMessage> connection;
57+
private final BroadcastProcessor<MqttPublishMessage> messages;
58+
59+
public ClientHolder(MqttClient client, String host, int port, String server) {
60+
this.client = client;
61+
this.connection = client.connect(port, host, server).cache();
62+
messages = BroadcastProcessor.create();
63+
client.publishHandler(messages::onNext);
64+
client.closeHandler(v -> messages.onComplete());
65+
client.exceptionHandler(messages::onError);
66+
}
67+
68+
public Uni<MqttClient> connect() {
69+
return connection
70+
.map(ignored -> client);
71+
}
72+
73+
public Multi<MqttPublishMessage> stream() {
74+
return messages;
75+
}
76+
}
77+
78+
}

smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99
import javax.annotation.PostConstruct;
1010
import javax.enterprise.context.ApplicationScoped;
11+
import javax.enterprise.context.Destroyed;
12+
import javax.enterprise.event.Observes;
1113
import javax.inject.Inject;
1214

1315
import org.eclipse.microprofile.config.Config;
@@ -89,4 +91,8 @@ public boolean isReady() {
8991

9092
return ready;
9193
}
94+
95+
public void destroy(@Observes @Destroyed(ApplicationScoped.class) final Object context) {
96+
Clients.clear();
97+
}
9298
}

smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.smallrye.reactive.messaging.mqtt;
22

33
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.CompletionStage;
45
import java.util.concurrent.atomic.AtomicBoolean;
6+
import java.util.concurrent.atomic.AtomicReference;
57

68
import org.eclipse.microprofile.reactive.messaging.Message;
79
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
@@ -24,7 +26,6 @@ public class MqttSink {
2426

2527
private final String host;
2628
private final int port;
27-
private final MqttClient client;
2829
private final String server;
2930
private final String topic;
3031
private final int qos;
@@ -39,49 +40,60 @@ public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
3940
port = config.getPort().orElse(def);
4041
server = config.getServerName().orElse(null);
4142
topic = config.getTopic().orElseGet(config::getChannel);
42-
client = MqttClient.create(vertx, options);
4343
qos = config.getQos();
4444

45+
AtomicReference<MqttClient> reference = new AtomicReference<>();
4546
sink = ReactiveStreams.<Message<?>> builder()
4647
.flatMapCompletionStage(msg -> {
47-
// If not connected, connect
48-
if (connected.get()) {
49-
//forwarding
48+
MqttClient client = reference.get();
49+
if (client != null && client.isConnected()) {
50+
connected.set(true);
5051
return CompletableFuture.completedFuture(msg);
5152
} else {
52-
return client.connect(port, host, server).subscribeAsCompletionStage()
53-
.thenApply(x -> {
53+
return Clients.getConnectedClient(vertx, host, port, server, options)
54+
.map(c -> {
55+
reference.set(c);
5456
connected.set(true);
5557
return msg;
56-
});
58+
})
59+
.subscribeAsCompletionStage();
5760
}
5861
})
59-
.flatMapCompletionStage(msg -> {
60-
String actualTopicToBeUsed = this.topic;
61-
MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
62-
boolean isRetain = false;
63-
64-
if (msg instanceof SendingMqttMessage) {
65-
MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
66-
67-
actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
68-
actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
69-
isRetain = mm.isRetain();
70-
}
71-
72-
if (actualTopicToBeUsed == null) {
73-
LOGGER.error("Ignoring message - no topic set");
74-
return CompletableFuture.completedFuture(msg);
62+
.flatMapCompletionStage(msg -> send(reference, msg))
63+
.onComplete(() -> {
64+
MqttClient c = reference.getAndSet(null);
65+
if (c != null) {
66+
connected.set(false);
67+
c.disconnectAndForget();
7568
}
76-
77-
return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
78-
.subscribeAsCompletionStage();
7969
})
80-
.onComplete(client::disconnect)
8170
.onError(t -> LOGGER.error("An error has been caught while sending a MQTT message to the broker", t))
8271
.ignore();
8372
}
8473

74+
private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?> msg) {
75+
MqttClient client = reference.get();
76+
String actualTopicToBeUsed = this.topic;
77+
MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
78+
boolean isRetain = false;
79+
80+
if (msg instanceof SendingMqttMessage) {
81+
MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
82+
83+
actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
84+
actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
85+
isRetain = mm.isRetain();
86+
}
87+
88+
if (actualTopicToBeUsed == null) {
89+
LOGGER.error("Ignoring message - no topic set");
90+
return CompletableFuture.completedFuture(msg);
91+
}
92+
93+
return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
94+
.subscribeAsCompletionStage();
95+
}
96+
8597
private Buffer convert(Object payload) {
8698
if (payload instanceof JsonObject) {
8799
return new Buffer(((JsonObject) payload).toBuffer());

smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,8 @@
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

10-
import io.smallrye.mutiny.Multi;
11-
import io.smallrye.mutiny.subscription.BackPressureStrategy;
1210
import io.vertx.mqtt.MqttClientOptions;
1311
import io.vertx.mutiny.core.Vertx;
14-
import io.vertx.mutiny.mqtt.MqttClient;
1512

1613
public class MqttSource {
1714

@@ -28,19 +25,18 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
2825
String server = config.getServerName().orElse(null);
2926
String topic = config.getTopic().orElseGet(config::getChannel);
3027
int qos = config.getQos();
31-
MqttClient client = MqttClient.create(vertx, options);
3228
boolean broadcast = config.getBroadcast();
3329

30+
Clients.ClientHolder holder = Clients.getHolder(vertx, host, port, server, options);
3431
this.source = ReactiveStreams.fromPublisher(
35-
client.connect(port, host, server)
36-
.onItem().produceMulti(a -> Multi.createFrom().<MqttMessage<?>> emitter(emitter -> {
37-
client.publishHandler(message -> emitter.emit(new ReceivingMqttMessage(message)));
38-
39-
client.subscribe(topic, qos).subscribe().with(
40-
i -> subscribed.set(true),
41-
emitter::fail);
42-
43-
}, BackPressureStrategy.BUFFER))
32+
holder.connect()
33+
.onItem().produceMulti(client -> client.subscribe(topic, qos)
34+
.onItem().produceMulti(x -> {
35+
subscribed.set(true);
36+
return holder.stream()
37+
.transform().byFilteringItemsWith(m -> m.topicName().equals(topic))
38+
.onItem().apply(ReceivingMqttMessage::new);
39+
}))
4440
.then(multi -> {
4541
if (broadcast) {
4642
return multi.broadcast().toAllSubscribers();
@@ -49,7 +45,6 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
4945
})
5046
.on().cancellation(() -> {
5147
subscribed.set(false);
52-
client.disconnectAndForget();
5348
})
5449
.onFailure().invoke(t -> LOGGER.error("Unable to establish a connection with the MQTT broker", t)));
5550
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package io.smallrye.reactive.messaging.mqtt;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
import static org.awaitility.Awaitility.await;
5+
6+
import java.time.Duration;
7+
import java.util.*;
8+
import java.util.concurrent.CopyOnWriteArrayList;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import javax.enterprise.context.ApplicationScoped;
12+
13+
import org.eclipse.microprofile.reactive.messaging.Incoming;
14+
import org.eclipse.microprofile.reactive.messaging.Outgoing;
15+
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral;
16+
import org.jboss.weld.environment.se.Weld;
17+
import org.jboss.weld.environment.se.WeldContainer;
18+
import org.junit.After;
19+
import org.junit.Test;
20+
21+
import io.smallrye.mutiny.Multi;
22+
import io.smallrye.reactive.messaging.extension.MediatorManager;
23+
24+
public class ConnectionSharingTest extends MqttTestBase {
25+
26+
private WeldContainer container;
27+
28+
@After
29+
public void cleanup() {
30+
if (container != null) {
31+
container.close();
32+
}
33+
Clients.clear();
34+
}
35+
36+
@Test
37+
public void testWithClientId() {
38+
Clients.clear();
39+
Weld weld = baseWeld(getConfig());
40+
weld.addBeanClass(App.class);
41+
container = weld.initialize();
42+
43+
App bean = container.getBeanManager().createInstance().select(App.class).get();
44+
45+
await().until(() -> this.container.select(MediatorManager.class).get().isInitialized());
46+
47+
await()
48+
.until(() -> this.container.select(MqttConnector.class, ConnectorLiteral.of("smallrye-mqtt")).get().isReady());
49+
50+
await().atMost(2, TimeUnit.MINUTES).until(() -> bean.prices().size() >= 10);
51+
assertThat(bean.prices()).isNotEmpty();
52+
}
53+
54+
private MapBasedConfig getConfig() {
55+
String topic = UUID.randomUUID().toString();
56+
String prices = "mp.messaging.incoming.prices.";
57+
String generator = "mp.messaging.outgoing.to-mqtt.";
58+
Map<String, Object> config = new HashMap<>();
59+
60+
config.put(prices + "topic", topic);
61+
config.put(prices + "connector", MqttConnector.CONNECTOR_NAME);
62+
config.put(prices + "host", System.getProperty("mqtt-host"));
63+
config.put(prices + "port", Integer.valueOf(System.getProperty("mqtt-port")));
64+
config.put(prices + "qos", 1);
65+
config.put(prices + "client-id", "my-id");
66+
if (System.getProperty("mqtt-user") != null) {
67+
config.put(prices + "username", System.getProperty("mqtt-user"));
68+
config.put(prices + "password", System.getProperty("mqtt-pwd"));
69+
}
70+
71+
config.put(generator + "topic", topic);
72+
config.put(generator + "connector", MqttConnector.CONNECTOR_NAME);
73+
config.put(generator + "host", System.getProperty("mqtt-host"));
74+
config.put(generator + "port", Integer.valueOf(System.getProperty("mqtt-port")));
75+
config.put(generator + "qos", 1);
76+
config.put(generator + "client-id", "my-id");
77+
if (System.getProperty("mqtt-user") != null) {
78+
config.put(generator + "username", System.getProperty("mqtt-user"));
79+
config.put(generator + "password", System.getProperty("mqtt-pwd"));
80+
}
81+
82+
return new MapBasedConfig(config);
83+
}
84+
85+
@ApplicationScoped
86+
public static class App {
87+
88+
List<String> prices = new CopyOnWriteArrayList<>();
89+
Random random = new Random();
90+
91+
@Incoming("prices")
92+
public void processPrices(byte[] priceRaw) {
93+
prices.add(new String(priceRaw));
94+
}
95+
96+
@Outgoing("to-mqtt")
97+
public Multi<Integer> generate() {
98+
return Multi.createFrom().ticks().every(Duration.ofMillis(100))
99+
.map(l -> random.nextInt(100))
100+
.on().overflow().drop()
101+
.transform().byTakingFirstItems(100);
102+
}
103+
104+
public List<String> prices() {
105+
return prices;
106+
}
107+
108+
}
109+
110+
}

smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSinkTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public void cleanup() {
3030
if (container != null) {
3131
container.close();
3232
}
33+
Clients.clear();
3334
}
3435

3536
@SuppressWarnings("unchecked")
@@ -114,6 +115,7 @@ public void testSinkUsingString() throws InterruptedException {
114115
@Test
115116
@Repeat(times = 5)
116117
public void testABeanProducingMessagesSentToMQTT() throws InterruptedException {
118+
Clients.clear();
117119
Weld weld = baseWeld(getConfig());
118120
weld.addBeanClass(ProducingBean.class);
119121

0 commit comments

Comments
 (0)