From bda190e3671bfff8df98f98bf2a4175deff9e6fb Mon Sep 17 00:00:00 2001
From: Jens Reimann <jreimann@redhat.com>
Date: Thu, 27 May 2021 17:53:37 +0200
Subject: [PATCH] Fix broken MQTT client by implementing a reconnect "session"

This is based on the content of PR
https://github.com/vert-x3/vertx-mqtt/pull/197

Fixes #1181
---
 .../connector/smallrye-mqtt-incoming.adoc     |   4 -
 .../connector/smallrye-mqtt-outgoing.adoc     |   4 -
 .../reactive/messaging/mqtt/Clients.java      |  83 ++-
 .../messaging/mqtt/MqttConnector.java         |   3 +-
 .../reactive/messaging/mqtt/MqttHelpers.java  |  22 +-
 .../reactive/messaging/mqtt/MqttSink.java     |  85 +--
 .../reactive/messaging/mqtt/MqttSource.java   |  35 +-
 .../messaging/mqtt/i18n/MqttExceptions.java   |   3 +
 .../ConstantReconnectDelayOptions.java        |  49 ++
 .../ExponentialBackoffDelayOptions.java       | 119 ++++
 .../mqtt/session/MqttClientSession.java       | 236 ++++++
 .../session/MqttClientSessionOptions.java     |  72 ++
 .../mqtt/session/ReconnectDelayOptions.java   |  25 +
 .../mqtt/session/ReconnectDelayProvider.java  |  11 +
 .../messaging/mqtt/session/RequestedQoS.java  |  37 +
 .../messaging/mqtt/session/SessionEvent.java  |  10 +
 .../messaging/mqtt/session/SessionState.java  |  27 +
 .../mqtt/session/SubscriptionEvent.java       |  12 +
 .../mqtt/session/SubscriptionState.java       |  29 +
 .../session/impl/MqttClientSessionImpl.java   | 672 ++++++++++++++++++
 .../mqtt/session/impl/SessionEventImpl.java   |  38 +
 .../session/impl/SubscriptionEventImpl.java   |  77 ++
 .../messaging/mqtt/MqttSourceTest.java        |   8 +-
 .../mqtt/MutualTlsMqttSourceTest.java         |   2 +-
 .../messaging/mqtt/SecureMqttSourceTest.java  |   2 +-
 .../messaging/mqtt/TlsMqttSourceTest.java     |   2 +-
 26 files changed, 1549 insertions(+), 118 deletions(-)
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java
 create mode 100644 smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java

diff --git a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-incoming.adoc b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-incoming.adoc
index c81675a93e..38d48e6cff 100644
--- a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-incoming.adoc
+++ b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-incoming.adoc
@@ -71,10 +71,6 @@ Type: _int_ | false | `0`
 
 Type: _int_ | false | `8092`
 
-| *reconnect-attempts* | Set the max reconnect attempts
-
-Type: _int_ | false | `5`
-
 | *reconnect-interval-seconds* | Set the reconnect interval in seconds
 
 Type: _int_ | false | `1`
diff --git a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-outgoing.adoc b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-outgoing.adoc
index 075f6ccc9a..34d789ea91 100644
--- a/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-outgoing.adoc
+++ b/documentation/src/main/doc/modules/connectors/partials/META-INF/connector/smallrye-mqtt-outgoing.adoc
@@ -55,10 +55,6 @@ Type: _int_ | false |
 
 Type: _int_ | false | `0`
 
-| *reconnect-attempts* | Set the max reconnect attempts
-
-Type: _int_ | false | `5`
-
 | *reconnect-interval-seconds* | Set the reconnect interval in seconds
 
 Type: _int_ | false | `1`
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java
index 695722b794..9d6ff9a4f6 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/Clients.java
@@ -4,12 +4,13 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import io.smallrye.mutiny.Multi;
-import io.smallrye.mutiny.Uni;
 import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
-import io.vertx.mqtt.MqttClientOptions;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
+import io.smallrye.reactive.messaging.mqtt.session.SessionState;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
 import io.vertx.mutiny.core.Vertx;
-import io.vertx.mutiny.mqtt.MqttClient;
-import io.vertx.mutiny.mqtt.messages.MqttConnAckMessage;
 import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
 
 public class Clients {
@@ -20,25 +21,23 @@ private Clients() {
         // avoid direct instantiation.
     }
 
-    static Uni<MqttClient> getConnectedClient(Vertx vertx, String host, int port, String server,
-            MqttClientOptions options) {
-        String id = host + port + "<" + (server == null ? "" : server)
-                + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
-        ClientHolder holder = clients.computeIfAbsent(id, key -> {
-            MqttClient client = MqttClient.create(vertx, options);
-            return new ClientHolder(client, host, port, server);
-        });
-        return holder.connect();
-    }
+    static ClientHolder getHolder(Vertx vertx, MqttClientSessionOptions options) {
 
-    static ClientHolder getHolder(Vertx vertx, String host, int port, String server,
-            MqttClientOptions options) {
+        String host = options.getHostname();
+        int port = options.getPort();
+        String clientId = options.getClientId();
+        String server = options.getServerName().orElse(null);
+        String username = options.getUsername();
+        String password = options.getPassword();
 
-        String id = host + port + "<" + (server == null ? "" : server)
-                + ">-[" + (options.getClientId() != null ? options.getClientId() : "") + "]";
+        String id = username + ":" + password + "@"
+                + host + ":"
+                + port
+                + "<" + (server == null ? "" : server)
+                + ">-[" + (clientId != null ? clientId : "") + "]";
         return clients.computeIfAbsent(id, key -> {
-            MqttClient client = MqttClient.create(vertx, options);
-            return new ClientHolder(client, host, port, server);
+            MqttClientSession client = MqttClientSession.create(vertx.getDelegate(), options);
+            return new ClientHolder(client);
         });
     }
 
@@ -52,33 +51,53 @@ public static void clear() {
 
     public static class ClientHolder {
 
-        private final MqttClient client;
-        private final Uni<MqttConnAckMessage> connection;
+        private final MqttClientSession client;
         private final BroadcastProcessor<MqttPublishMessage> messages;
+        private final BroadcastProcessor<SessionState> sessionState;
+        private final BroadcastProcessor<SubscriptionEvent> subscriptionState;
 
-        public ClientHolder(MqttClient client, String host, int port, String server) {
+        public ClientHolder(MqttClientSession client) {
             this.client = client;
-            this.connection = client.connect(port, host, server).memoize().indefinitely();
             messages = BroadcastProcessor.create();
-            client.publishHandler(messages::onNext);
-            client.closeHandler(messages::onComplete);
+            sessionState = BroadcastProcessor.create();
+            subscriptionState = BroadcastProcessor.create();
+            client.messageHandler(m -> messages.onNext(MqttPublishMessage.newInstance(m)));
             client.exceptionHandler(messages::onError);
+            client.sessionStateHandler(evt -> sessionState.onNext(evt.getSessionState()));
+            client.subscriptionStateHandler(subscriptionState::onNext);
         }
 
-        public Uni<MqttClient> connect() {
-            return connection
-                    .map(ignored -> client);
+        public void start() {
+            client.start();
         }
 
         public void close() {
-            if (client.isConnected()) {
-                client.disconnectAndAwait();
-            }
+            client.stop();
+        }
+
+        public Multi<SessionState> sessionState() {
+            return Multi.createFrom()
+                    .item(this.client.getState())
+                    .onCompletion()
+                    .switchTo(this.sessionState);
+        }
+
+        public Multi<SubscriptionState> subscriptionState(String topic) {
+            return Multi.createFrom()
+                    .item(this.client.getSubscriptionState(topic))
+                    .onCompletion()
+                    .switchTo(this.subscriptionState
+                            .filter(evt -> evt.getTopic().equals(topic))
+                            .map(SubscriptionEvent::getSubscriptionState));
         }
 
         public Multi<MqttPublishMessage> stream() {
             return messages;
         }
+
+        public MqttClientSession getClient() {
+            return client;
+        }
     }
 
 }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java
index 73cd1fb6df..af5f146dd9 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttConnector.java
@@ -42,7 +42,6 @@
 @ConnectorAttribute(name = "will-retain", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Set if the will message must be retained", defaultValue = "false")
 @ConnectorAttribute(name = "will-qos", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the QoS level for the will message", defaultValue = "0")
 @ConnectorAttribute(name = "max-message-size", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set max MQTT message size in bytes", defaultValue = "8092")
-@ConnectorAttribute(name = "reconnect-attempts", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the max reconnect attempts", defaultValue = "5")
 @ConnectorAttribute(name = "reconnect-interval-seconds", type = "int", direction = INCOMING_AND_OUTGOING, description = "Set the reconnect interval in seconds", defaultValue = "1")
 @ConnectorAttribute(name = "username", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the username to connect to the server")
 @ConnectorAttribute(name = "password", type = "string", direction = INCOMING_AND_OUTGOING, description = "Set the password to connect to the server")
@@ -99,7 +98,7 @@ public boolean isReady() {
     public boolean isSourceReady() {
         boolean ready = true;
         for (MqttSource source : sources) {
-            ready = ready && source.isSubscribed();
+            ready = ready && source.isReady();
         }
         return ready;
     }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java
index 94372af6f5..81a9d018ae 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttHelpers.java
@@ -1,15 +1,17 @@
 package io.smallrye.reactive.messaging.mqtt;
 
+import java.time.Duration;
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
 
+import io.smallrye.reactive.messaging.mqtt.session.ConstantReconnectDelayOptions;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
+import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayOptions;
 import io.vertx.core.net.JksOptions;
 import io.vertx.core.net.KeyCertOptions;
 import io.vertx.core.net.PemKeyCertOptions;
 import io.vertx.core.net.PemTrustOptions;
 import io.vertx.core.net.PfxOptions;
 import io.vertx.core.net.TrustOptions;
-import io.vertx.mqtt.MqttClientOptions;
 
 public class MqttHelpers {
 
@@ -17,21 +19,23 @@ private MqttHelpers() {
         // avoid direct instantiation.
     }
 
-    static MqttClientOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
-        MqttClientOptions options = new MqttClientOptions();
+    static MqttClientSessionOptions createMqttClientOptions(MqttConnectorCommonConfiguration config) {
+        MqttClientSessionOptions options = new MqttClientSessionOptions();
         options.setCleanSession(config.getAutoCleanSession());
         options.setAutoGeneratedClientId(config.getAutoGeneratedClientId());
         options.setAutoKeepAlive(config.getAutoKeepAlive());
         options.setClientId(config.getClientId().orElse(null));
         options.setConnectTimeout(config.getConnectTimeoutSeconds());
+        options.setHostname(config.getHost());
         options.setKeepAliveInterval(config.getKeepAliveSeconds());
         options.setMaxInflightQueue(config.getMaxInflightQueue());
         options.setMaxMessageSize(config.getMaxMessageSize());
         options.setPassword(config.getPassword().orElse(null));
-        options.setReconnectAttempts(config.getReconnectAttempts());
-        options.setReconnectInterval(TimeUnit.SECONDS.toMillis(config.getReconnectIntervalSeconds()));
+        options.setPort(config.getPort().orElseGet(() -> config.getSsl() ? 8883 : 1883));
+        options.setReconnectDelay(getReconnectDelayOptions(config));
         options.setSsl(config.getSsl());
         options.setKeyCertOptions(getKeyCertOptions(config));
+        options.setServerName(config.getServerName());
         options.setTrustOptions(getTrustOptions(config));
         options.setTrustAll(config.getTrustAll());
         options.setUsername(config.getUsername().orElse(null));
@@ -122,4 +126,10 @@ private static TrustOptions getTrustOptions(MqttConnectorCommonConfiguration con
         return null;
     }
 
+    private static ReconnectDelayOptions getReconnectDelayOptions(MqttConnectorCommonConfiguration config) {
+        ConstantReconnectDelayOptions options = new ConstantReconnectDelayOptions();
+        options.setDelay(Duration.ofSeconds(config.getReconnectIntervalSeconds()));
+        return options;
+    }
+
 }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java
index a16ca7a092..ca5dbf6c47 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSink.java
@@ -13,86 +13,72 @@
 
 import io.netty.handler.codec.mqtt.MqttQoS;
 import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.vertx.AsyncResultUni;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
+import io.smallrye.reactive.messaging.mqtt.session.SessionState;
 import io.vertx.core.json.Json;
 import io.vertx.core.json.JsonArray;
 import io.vertx.core.json.JsonObject;
-import io.vertx.mqtt.MqttClientOptions;
 import io.vertx.mutiny.core.Vertx;
 import io.vertx.mutiny.core.buffer.Buffer;
-import io.vertx.mutiny.mqtt.MqttClient;
 
 public class MqttSink {
 
-    private final String host;
-    private final int port;
-    private final String server;
     private final String topic;
     private final int qos;
 
     private final SubscriberBuilder<? extends Message<?>, Void> sink;
-    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean ready = new AtomicBoolean();
 
     public MqttSink(Vertx vertx, MqttConnectorOutgoingConfiguration config) {
-        MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
-        host = config.getHost();
-        int def = options.isSsl() ? 8883 : 1883;
-        port = config.getPort().orElse(def);
-        server = config.getServerName().orElse(null);
+        MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);
         topic = config.getTopic().orElseGet(config::getChannel);
         qos = config.getQos();
 
-        AtomicReference<MqttClient> reference = new AtomicReference<>();
+        AtomicReference<Clients.ClientHolder> reference = new AtomicReference<>();
         sink = ReactiveStreams.<Message<?>> builder()
                 .flatMapCompletionStage(msg -> {
-                    MqttClient client = reference.get();
-                    if (client != null) {
-                        if (client.isConnected()) {
-                            connected.set(true);
-                            return CompletableFuture.completedFuture(msg);
-                        } else {
-                            CompletableFuture<Message<?>> future = new CompletableFuture<>();
-                            vertx.setPeriodic(100, id -> {
-                                if (client.isConnected()) {
-                                    vertx.cancelTimer(id);
-                                    connected.set(true);
-                                    future.complete(msg);
-                                }
-                            });
-                            return future;
-                        }
-                    } else {
-                        return Clients.getConnectedClient(vertx, host, port, server, options)
-                                .map(c -> {
-                                    reference.set(c);
-                                    connected.set(true);
-                                    return msg;
-                                })
-                                .subscribeAsCompletionStage();
+                    Clients.ClientHolder client = reference.get();
+                    if (client == null) {
+                        client = Clients.getHolder(vertx, options);
+                        // FIXME: add session state listener
+                        client.start();
+                        reference.set(client);
                     }
+
+                    return client.sessionState()
+                            .filter(state -> state != SessionState.CONNECTED)
+                            .map(ignore -> msg).toUni().subscribeAsCompletionStage();
+
                 })
                 .flatMapCompletionStage(msg -> send(reference, msg))
                 .onComplete(() -> {
-                    MqttClient c = reference.getAndSet(null);
+                    Clients.ClientHolder c = reference.getAndSet(null);
                     if (c != null) {
-                        connected.set(false);
-                        c.disconnectAndForget();
+                        ready.set(false);
+                        c.close();
                     }
                 })
                 .onError(log::errorWhileSendingMessageToBroker)
                 .ignore();
     }
 
-    private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?> msg) {
-        MqttClient client = reference.get();
-        String actualTopicToBeUsed = this.topic;
-        MqttQoS actualQoS = MqttQoS.valueOf(this.qos);
-        boolean isRetain = false;
+    private CompletionStage<?> send(AtomicReference<Clients.ClientHolder> reference, Message<?> msg) {
+        MqttClientSession client = reference.get().getClient();
+        final String actualTopicToBeUsed;
+        final MqttQoS actualQoS;
+        final boolean isRetain;
 
         if (msg instanceof SendingMqttMessage) {
             MqttMessage<?> mm = ((SendingMqttMessage<?>) msg);
-            actualTopicToBeUsed = mm.getTopic() == null ? topic : mm.getTopic();
-            actualQoS = mm.getQosLevel() == null ? actualQoS : mm.getQosLevel();
+            actualTopicToBeUsed = mm.getTopic() == null ? this.topic : mm.getTopic();
+            actualQoS = mm.getQosLevel() == null ? MqttQoS.valueOf(this.qos) : mm.getQosLevel();
             isRetain = mm.isRetain();
+        } else {
+            actualTopicToBeUsed = this.topic;
+            isRetain = false;
+            actualQoS = MqttQoS.valueOf(this.qos);
         }
 
         if (actualTopicToBeUsed == null) {
@@ -100,7 +86,10 @@ private CompletionStage<?> send(AtomicReference<MqttClient> reference, Message<?
             return CompletableFuture.completedFuture(msg);
         }
 
-        return client.publish(actualTopicToBeUsed, convert(msg.getPayload()), actualQoS, false, isRetain)
+        return AsyncResultUni
+                .<Integer> toUni(h -> client
+                        .publish(actualTopicToBeUsed, convert(msg.getPayload()).getDelegate(), actualQoS, false, isRetain)
+                        .onComplete(h))
                 .onItemOrFailure().transformToUni((s, f) -> {
                     if (f != null) {
                         return Uni.createFrom().completionStage(msg.nack(f).thenApply(x -> msg));
@@ -139,6 +128,6 @@ public SubscriberBuilder<? extends Message<?>, Void> getSink() {
     }
 
     public boolean isReady() {
-        return connected.get();
+        return ready.get();
     }
 }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java
index d9647270db..54788984c6 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java
@@ -9,18 +9,20 @@
 import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
 import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 
-import io.vertx.mqtt.MqttClientOptions;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
+import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
 import io.vertx.mutiny.core.Vertx;
 import io.vertx.mutiny.mqtt.messages.MqttPublishMessage;
 
 public class MqttSource {
 
     private final PublisherBuilder<MqttMessage<?>> source;
-    private final AtomicBoolean subscribed = new AtomicBoolean();
+    private final AtomicBoolean ready = new AtomicBoolean();
     private final Pattern pattern;
 
     public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
-        MqttClientOptions options = MqttHelpers.createMqttClientOptions(config);
+        MqttClientSessionOptions options = MqttHelpers.createMqttClientOptions(config);
 
         String host = config.getHost();
         int def = options.isSsl() ? 8883 : 1883;
@@ -40,23 +42,25 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config) {
             pattern = null;
         }
 
-        Clients.ClientHolder holder = Clients.getHolder(vertx, host, port, server, options);
+        Clients.ClientHolder holder = Clients.getHolder(vertx, options);
+        holder.subscriptionState(topic)
+                .onItem().invoke(state -> {
+                    ready.set(state == SubscriptionState.SUBSCRIBED);
+                });
+        holder.start();
+        holder.getClient()
+                .subscribe(topic, RequestedQoS.valueOf(qos));
         this.source = ReactiveStreams.fromPublisher(
-                holder.connect()
-                        .onItem().transformToMulti(client -> client.subscribe(topic, qos)
-                                .onItem().transformToMulti(x -> {
-                                    subscribed.set(true);
-                                    return holder.stream()
-                                            .select().where(m -> matches(topic, m))
-                                            .onItem().transform(m -> new ReceivingMqttMessage(m, onNack));
-                                }))
+                holder.stream()
+                        .select().where(m -> matches(topic, m))
+                        .onItem().transform(m -> new ReceivingMqttMessage(m, onNack))
                         .stage(multi -> {
                             if (broadcast) {
                                 return multi.broadcast().toAllSubscribers();
                             }
                             return multi;
                         })
-                        .onCancellation().invoke(() -> subscribed.set(false))
+                        .onCancellation().invoke(() -> holder.getClient().unsubscribe(topic))
                         .onFailure().invoke(log::unableToConnectToBroker));
     }
 
@@ -82,7 +86,8 @@ PublisherBuilder<MqttMessage<?>> getSource() {
         return source;
     }
 
-    boolean isSubscribed() {
-        return subscribed.get();
+    public boolean isReady() {
+        return ready.get();
     }
+
 }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java
index 1fe53d3906..d95f51b278 100644
--- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/i18n/MqttExceptions.java
@@ -16,4 +16,7 @@ public interface MqttExceptions {
     @Message(id = 17000, value = "Unknown failure strategy: %s")
     IllegalArgumentException illegalArgumentUnknownStrategy(String strategy);
 
+    @Message(id = 17001, value = "Invalid QoS value: %s")
+    IllegalArgumentException illegalArgumentInvalidQoS(int qos);
+
 }
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java
new file mode 100644
index 0000000000..dd9b614308
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ConstantReconnectDelayOptions.java
@@ -0,0 +1,49 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import java.time.Duration;
+
+public class ConstantReconnectDelayOptions implements ReconnectDelayOptions {
+
+    private static final Duration DEFAULT_DELAY = Duration.ofSeconds(10);
+
+    private Duration delay = DEFAULT_DELAY;
+
+    public ConstantReconnectDelayOptions() {
+    }
+
+    public ConstantReconnectDelayOptions setDelay(Duration delay) {
+        this.delay = delay;
+        return this;
+    }
+
+    public Duration getDelay() {
+        return this.delay;
+    }
+
+    @Override
+    public ReconnectDelayProvider createProvider() {
+
+        final Duration delay = this.delay;
+
+        return new ReconnectDelayProvider() {
+
+            @Override
+            public Duration nextDelay() {
+                return delay;
+            }
+
+            @Override
+            public void reset() {
+                // no-op
+            }
+        };
+
+    }
+
+    @Override
+    public ReconnectDelayOptions copy() {
+        ConstantReconnectDelayOptions result = new ConstantReconnectDelayOptions();
+        result.delay = this.delay;
+        return result;
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java
new file mode 100644
index 0000000000..dc340c3520
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ExponentialBackoffDelayOptions.java
@@ -0,0 +1,119 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import java.time.Duration;
+
+public class ExponentialBackoffDelayOptions implements ReconnectDelayOptions {
+
+    private static final Duration DEFAULT_MINIMUM = Duration.ofSeconds(1);
+    private static final Duration DEFAULT_INCREMENT = Duration.ofSeconds(1);
+    private static final Duration DEFAULT_MAXIMUM = Duration.ofMinutes(5);
+
+    private Duration minimum = DEFAULT_MINIMUM;
+    private Duration increment = DEFAULT_INCREMENT;
+    private Duration maximum = DEFAULT_MAXIMUM;
+
+    public ExponentialBackoffDelayOptions() {
+    }
+
+    public ExponentialBackoffDelayOptions setIncrement(Duration increment) {
+        this.increment = increment;
+        return this;
+    }
+
+    public Duration getIncrement() {
+        return this.increment;
+    }
+
+    public ExponentialBackoffDelayOptions setMaximum(Duration maximum) {
+        this.maximum = maximum;
+        return this;
+    }
+
+    public Duration getMaximum() {
+        return this.maximum;
+    }
+
+    public ExponentialBackoffDelayOptions setMinimum(Duration minimum) {
+        this.minimum = minimum;
+        return this;
+    }
+
+    public Duration getMinimum() {
+        return this.minimum;
+    }
+
+    private void validate() {
+        if (this.minimum.isNegative()) {
+            throw new IllegalArgumentException("'minimum' must be a positive or zero duration");
+        }
+        if (this.increment.isNegative() || this.increment.isZero()) {
+            throw new IllegalArgumentException("'increment' must be a positive duration");
+        }
+        if (this.maximum.isNegative() || this.maximum.isZero()) {
+            throw new IllegalArgumentException("'maximum' must be a positive duration");
+        }
+        if (this.maximum.compareTo(this.minimum) < 0) {
+            throw new IllegalArgumentException("'minimum' must be less than (or equal) to the maximum");
+        }
+    }
+
+    @Override
+    public ReconnectDelayProvider createProvider() {
+        validate();
+
+        long num = this.maximum.minus(this.minimum).toMillis() / this.increment.toMillis();
+        long max = (long) (Math.log(num) / Math.log(2)) + 1;
+
+        return new Provider(this.minimum, this.increment, this.maximum, max);
+    }
+
+    @Override
+    public ReconnectDelayOptions copy() {
+        ExponentialBackoffDelayOptions result = new ExponentialBackoffDelayOptions();
+        result.minimum = this.minimum;
+        result.increment = this.increment;
+        result.maximum = this.maximum;
+        return result;
+    }
+
+    private static class Provider implements ReconnectDelayProvider {
+
+        private final Duration minimum;
+        private final Duration increment;
+        private final Duration maximum;
+        private final long max;
+
+        private long count;
+
+        Provider(Duration minimum, Duration increment, Duration maximum, long max) {
+            this.minimum = minimum;
+            this.increment = increment;
+            this.maximum = maximum;
+            this.max = max;
+        }
+
+        @Override
+        public Duration nextDelay() {
+
+            if (this.count <= this.max) {
+
+                Duration delay = this.minimum;
+                if (this.count > 0) {
+                    delay = delay.plus(this.increment.multipliedBy((long) Math.pow(2, this.count - 1)));
+                }
+
+                this.count += 1;
+
+                return delay;
+            } else {
+                return this.maximum;
+            }
+
+        }
+
+        @Override
+        public void reset() {
+            this.count = 0;
+        }
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java
new file mode 100644
index 0000000000..379d0636fe
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSession.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.smallrye.reactive.messaging.mqtt.session.impl.MqttClientSessionImpl;
+import io.vertx.codegen.annotations.Fluent;
+import io.vertx.codegen.annotations.GenIgnore;
+import io.vertx.codegen.annotations.VertxGen;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+
+/**
+ * An MQTT client session.
+ */
+@VertxGen
+public interface MqttClientSession {
+
+    /**
+     * Create a new MQTT client session.
+     * <p>
+     * The session will initially be disconnected, and must be started using {@link #start()}.
+     *
+     * @param vertx Vert.x instance
+     * @param options MQTT client session options
+     * @return MQTT client session instance
+     */
+    static MqttClientSession create(Vertx vertx, MqttClientSessionOptions options) {
+        return new MqttClientSessionImpl(vertx, options);
+    }
+
+    /**
+     * Set the session state handler.
+     *
+     * @param sessionStateHandler The new handler, will overwrite the old one.
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    MqttClientSession sessionStateHandler(Handler<SessionEvent> sessionStateHandler);
+
+    /**
+     * Set the subscription state handler.
+     *
+     * @param subscriptionStateHandler The new handler, will overwrite the old one.
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    MqttClientSession subscriptionStateHandler(Handler<SubscriptionEvent> subscriptionStateHandler);
+
+    /**
+     * Set the publish complete handler.
+     *
+     * @param publishCompleteHandler The new handler, will overwrite the old one.
+     * @return current MQTT client session instance
+     * @see MqttClient#publishCompletionHandler(Handler)
+     */
+    @Fluent
+    MqttClientSession publishCompletionHandler(Handler<Integer> publishCompleteHandler);
+
+    /**
+     * Set the publish completion expiration handler.
+     *
+     * @param publishCompletionExpirationHandler The new handler, will overwrite the old one.
+     * @return current MQTT client session instance
+     * @see MqttClient#publishCompletionExpirationHandler(Handler)
+     */
+    @Fluent
+    MqttClientSession publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler);
+
+    /**
+     * Set the publish completion unknown packet id handler.
+     *
+     * @param publishCompletionUnknownPacketIdHandler The new handler, will overwrite the old one.
+     * @return current MQTT client session instance
+     * @see MqttClient#publishCompletionUnknownPacketIdHandler(Handler)
+     */
+    @Fluent
+    MqttClientSession publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionUnknownPacketIdHandler);
+
+    /**
+     * Start the session. This will try to drive the connection to {@link SessionState#CONNECTED}.
+     */
+    void start();
+
+    /**
+     * Stop the session. This will try to drive the connection to {@link SessionState#DISCONNECTED}.
+     */
+    void stop();
+
+    /**
+     * Get the current session state.
+     *
+     * @return The current state.
+     */
+    SessionState getState();
+
+    /**
+     * Get a current subscription state.
+     *
+     * @param topicFilter The topic filter to get the state for.
+     * @return The current state of the requested subscription.
+     */
+    SubscriptionState getSubscriptionState(String topicFilter);
+
+    /**
+     * Check if the session is currently connected.
+     *
+     * @return {@code true} if the session is currently connected, {@code false} otherwise.
+     */
+    default boolean isConnected() {
+        return getState() == SessionState.CONNECTED;
+    }
+
+    /**
+     * Subscribes to the topics with related QoS levels
+     *
+     * @param topics topics and related QoS levels to subscribe to
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    MqttClientSession subscribe(Map<String, RequestedQoS> topics);
+
+    /**
+     * Subscribes to a single topic with related QoS level.
+     *
+     * @param topic The topic to subscribe to.
+     * @param qos The QoS to request from the server.
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    default MqttClientSession subscribe(String topic, RequestedQoS qos) {
+        return subscribe(Collections.singletonMap(topic, qos));
+    }
+
+    /**
+     * Subscribes to a list of topics, with the same QoS.
+     *
+     * @param qos The QoS to use.
+     * @param topics The topics to subscribe to.
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    @GenIgnore
+    default MqttClientSession subscribe(RequestedQoS qos, String... topics) {
+        final Map<String, RequestedQoS> topicMap = new LinkedHashMap<>(topics.length);
+        for (String topic : topics) {
+            topicMap.put(topic, qos);
+        }
+        return subscribe(topicMap);
+    }
+
+    /**
+     * Unsubscribe from receiving messages on given topics
+     *
+     * @param topics Topics you want to unsubscribe from
+     * @return current MQTT client session instance
+     */
+    MqttClientSession unsubscribe(List<String> topics);
+
+    /**
+     * Unsubscribe from receiving messages on given topics
+     *
+     * @param topics Topics you want to unsubscribe from
+     * @return current MQTT client session instance
+     */
+    @GenIgnore
+    default MqttClientSession unsubscribe(String... topics) {
+        return unsubscribe(Arrays.asList(topics));
+    }
+
+    /**
+     * Sets handler which will be called each time server publish something to client
+     *
+     * @param messageHandler handler to call
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    MqttClientSession messageHandler(Handler<MqttPublishMessage> messageHandler);
+
+    /**
+     * Sets handler which will be called in case of an exception
+     *
+     * @param exceptionHandler handler to call
+     * @return current MQTT client session instance
+     */
+    @Fluent
+    MqttClientSession exceptionHandler(Handler<Throwable> exceptionHandler);
+
+    /**
+     * Sends the PUBLISH message to the remote MQTT server
+     *
+     * @param topic topic on which the message is published
+     * @param payload message payload
+     * @param qosLevel QoS level
+     * @param isDup if the message is a duplicate
+     * @param isRetain if the message needs to be retained
+     * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0)
+     */
+    Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain);
+
+    /**
+     * Sends the PUBLISH message to the remote MQTT server
+     *
+     * @param topic topic on which the message is published
+     * @param payload message payload
+     * @param qosLevel QoS level
+     * @return a {@code Future} completed after PUBLISH packet sent with packetid (not when QoS 0)
+     */
+    default Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel) {
+        return publish(topic, payload, qosLevel, false, false);
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java
new file mode 100644
index 0000000000..e53551932a
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/MqttClientSessionOptions.java
@@ -0,0 +1,72 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import java.util.Optional;
+
+import io.vertx.mqtt.MqttClientOptions;
+
+public class MqttClientSessionOptions extends MqttClientOptions {
+
+    private static final ReconnectDelayOptions DEFAULT_RECONNECT_DELAY = new ConstantReconnectDelayOptions();
+    private static final Optional<String> DEFAULT_SERVER_NAME = Optional.empty();
+
+    private String hostname = MqttClientOptions.DEFAULT_HOST;
+    private Optional<String> serverName = DEFAULT_SERVER_NAME;
+    private int port = MqttClientOptions.DEFAULT_PORT;
+    private ReconnectDelayOptions reconnectDelay = DEFAULT_RECONNECT_DELAY;
+
+    /**
+     * Default constructor
+     */
+    public MqttClientSessionOptions() {
+        super();
+    }
+
+    /**
+     * Copy constructor
+     *
+     * @param other the options to copy
+     */
+    public MqttClientSessionOptions(MqttClientSessionOptions other) {
+        super(other);
+        this.hostname = other.hostname;
+        this.port = other.port;
+        this.serverName = other.serverName;
+        this.reconnectDelay = other.reconnectDelay.copy();
+    }
+
+    public int getPort() {
+        return this.port;
+    }
+
+    public MqttClientSessionOptions setPort(int port) {
+        this.port = port;
+        return this;
+    }
+
+    public String getHostname() {
+        return this.hostname;
+    }
+
+    public MqttClientSessionOptions setHostname(String hostname) {
+        this.hostname = hostname;
+        return this;
+    }
+
+    public MqttClientSessionOptions setReconnectDelay(ReconnectDelayOptions reconnectDelay) {
+        this.reconnectDelay = reconnectDelay;
+        return this;
+    }
+
+    public ReconnectDelayOptions getReconnectDelay() {
+        return this.reconnectDelay;
+    }
+
+    public Optional<String> getServerName() {
+        return serverName;
+    }
+
+    public MqttClientSessionOptions setServerName(Optional<String> serverName) {
+        this.serverName = serverName;
+        return this;
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java
new file mode 100644
index 0000000000..40c4aacdf5
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayOptions.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2021 Red Hat Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.smallrye.reactive.messaging.mqtt.session;
+
+public interface ReconnectDelayOptions {
+
+    ReconnectDelayProvider createProvider();
+
+    ReconnectDelayOptions copy();
+
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java
new file mode 100644
index 0000000000..a4af273276
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/ReconnectDelayProvider.java
@@ -0,0 +1,11 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import java.time.Duration;
+
+public interface ReconnectDelayProvider {
+
+    Duration nextDelay();
+
+    void reset();
+
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java
new file mode 100644
index 0000000000..6e1c024ef7
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/RequestedQoS.java
@@ -0,0 +1,37 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import io.smallrye.reactive.messaging.mqtt.i18n.MqttExceptions;
+
+/**
+ * The requested QoS level.
+ * <p>
+ * NOTE: This is missing QoS 2, as this mode is not properly supported by the session.
+ */
+public enum RequestedQoS {
+    QOS_0(0),
+    QOS_1(1);
+
+    private final int value;
+
+    RequestedQoS(int value) {
+        this.value = value;
+    }
+
+    public int toInteger() {
+        return this.value;
+    }
+
+    public static RequestedQoS valueOf(Integer qos) {
+        if (qos == null) {
+            return null;
+        }
+        switch (qos) {
+            case 0:
+                return RequestedQoS.QOS_0;
+            case 1:
+                return RequestedQoS.QOS_1;
+            default:
+                throw MqttExceptions.ex.illegalArgumentInvalidQoS(qos);
+        }
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java
new file mode 100644
index 0000000000..36ad1e79ef
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionEvent.java
@@ -0,0 +1,10 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import io.vertx.codegen.annotations.VertxGen;
+
+@VertxGen
+public interface SessionEvent {
+    SessionState getSessionState();
+
+    Throwable getCause();
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java
new file mode 100644
index 0000000000..5fe99b9abd
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SessionState.java
@@ -0,0 +1,27 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+/**
+ * The state of the session.
+ */
+public enum SessionState {
+    /**
+     * The session is disconnected.
+     * <p>
+     * A re-connect timer may be pending.
+     */
+    DISCONNECTED,
+    /**
+     * The session started to connect.
+     * <p>
+     * This may include re-subscribing to any topics after the connect call was successful.
+     */
+    CONNECTING,
+    /**
+     * The session is connected.
+     */
+    CONNECTED,
+    /**
+     * The session is in the process of an orderly disconnect.
+     */
+    DISCONNECTING,
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java
new file mode 100644
index 0000000000..76e4bb3ee3
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionEvent.java
@@ -0,0 +1,12 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+import io.vertx.codegen.annotations.VertxGen;
+
+@VertxGen
+public interface SubscriptionEvent {
+    Integer getQos();
+
+    SubscriptionState getSubscriptionState();
+
+    String getTopic();
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java
new file mode 100644
index 0000000000..c6eec73ae2
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/SubscriptionState.java
@@ -0,0 +1,29 @@
+package io.smallrye.reactive.messaging.mqtt.session;
+
+/**
+ * The state of a subscription.
+ * <p>
+ * Subscriptions established when a new topic gets added, or the connection was established. If the subscribe call
+ * returns an error for the subscription, the state will remain {@link #FAILED} and it will not try to re-subscribe
+ * while the connection is active.
+ * <p>
+ * When the session (connection) disconnects, all subscriptions will automatically be reset to {@link #UNSUBSCRIBED}.
+ */
+public enum SubscriptionState {
+    /**
+     * The topic is not subscribed.
+     */
+    UNSUBSCRIBED,
+    /**
+     * The topic is in the process of subscribing.
+     */
+    SUBSCRIBING,
+    /**
+     * The topic is subscribed.
+     */
+    SUBSCRIBED,
+    /**
+     * The topic could not be subscribed.
+     */
+    FAILED,
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java
new file mode 100644
index 0000000000..deda1f6524
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/MqttClientSessionImpl.java
@@ -0,0 +1,672 @@
+package io.smallrye.reactive.messaging.mqtt.session.impl;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSession;
+import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions;
+import io.smallrye.reactive.messaging.mqtt.session.ReconnectDelayProvider;
+import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS;
+import io.smallrye.reactive.messaging.mqtt.session.SessionEvent;
+import io.smallrye.reactive.messaging.mqtt.session.SessionState;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.VertxException;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.impl.VertxInternal;
+import io.vertx.core.impl.logging.Logger;
+import io.vertx.core.impl.logging.LoggerFactory;
+import io.vertx.mqtt.MqttClient;
+import io.vertx.mqtt.messages.MqttConnAckMessage;
+import io.vertx.mqtt.messages.MqttPublishMessage;
+import io.vertx.mqtt.messages.MqttSubAckMessage;
+
+public class MqttClientSessionImpl implements MqttClientSession {
+
+    private static final Logger log = LoggerFactory.getLogger(MqttClientSessionImpl.class);
+
+    private final VertxInternal vertx;
+    private final MqttClientSessionOptions options;
+
+    // record the subscriptions
+    private final Map<String, RequestedQoS> subscriptions = new HashMap<>();
+    // record the pending subscribes
+    private final Map<Integer, LinkedHashMap<String, RequestedQoS>> pendingSubscribes = new HashMap<>();
+    // record the pending unsubscribes
+    private final Map<Integer, List<String>> pendingUnsubscribes = new HashMap<>();
+    // the provider for the reconnect delay
+    private final ReconnectDelayProvider reconnectDelay;
+
+    // the current state
+    private volatile SessionState state = SessionState.DISCONNECTED;
+    // drives to connection either to CONNECTED or DISCONNECTED
+    private volatile boolean running;
+    // subscription states
+    private final Map<String, SubscriptionState> subscriptionStates = new ConcurrentHashMap<>();
+
+    // holds the actual MQTT client connection
+    private MqttClient client;
+    // an optional reconnect timer
+    private Long reconnectTimer;
+
+    private volatile Handler<MqttPublishMessage> messageHandler;
+    private volatile Handler<Throwable> exceptionHandler;
+    private volatile Handler<SessionEvent> sessionStateHandler;
+    private volatile Handler<SubscriptionEvent> subscriptionStateHandler;
+    private volatile Handler<Integer> publishCompleteHandler;
+    private volatile Handler<Integer> publishCompletionExpirationHandler;
+    private volatile Handler<Integer> publishCompletionUnknownPacketIdHandler;
+
+    /**
+     * Create a new instance, which is not started.
+     *
+     * @param vertx The vert.x instance to use.
+     * @param options The client session options.
+     */
+    public MqttClientSessionImpl(final Vertx vertx, final MqttClientSessionOptions options) {
+        this.vertx = (VertxInternal) vertx;
+        this.options = options;
+        this.reconnectDelay = options.getReconnectDelay().createProvider();
+    }
+
+    @Override
+    public void start() {
+        this.vertx.runOnContext(x -> doStart());
+    }
+
+    @Override
+    public void stop() {
+        this.vertx.runOnContext(x -> doStop());
+    }
+
+    @Override
+    public SessionState getState() {
+        return this.state;
+    }
+
+    @Override
+    public SubscriptionState getSubscriptionState(String topicFilter) {
+        return this.subscriptionStates.get(topicFilter);
+    }
+
+    @Override
+    public MqttClientSession subscribe(Map<String, RequestedQoS> topics) {
+        final Map<String, RequestedQoS> finalTopics = new LinkedHashMap<>(topics);
+        this.vertx.runOnContext(x -> doSubscribe(finalTopics));
+        return this;
+    }
+
+    @Override
+    public MqttClientSession unsubscribe(List<String> topics) {
+        final Set<String> finalTopics = new HashSet<>(topics);
+        this.vertx.runOnContext(x -> doUnsubscribe(finalTopics));
+        return this;
+    }
+
+    private void doStart() {
+        if (this.running) {
+            // nothing to do
+            return;
+        }
+
+        // we connect, not re-connect
+        this.reconnectDelay.reset();
+
+        this.running = true;
+        switch (this.state) {
+            case DISCONNECTED:
+                // initiate connection
+                createConnection();
+                break;
+            case CONNECTING:
+                // nothing to do
+                break;
+            case CONNECTED:
+                // nothing to do
+                break;
+            case DISCONNECTING:
+                // we do nothing here and wait until the disconnection advanced, which will then trigger a re-connect
+                break;
+        }
+    }
+
+    private void doStop() {
+        if (!this.running) {
+            // nothing to do
+            return;
+        }
+
+        this.running = false;
+
+        if (this.reconnectTimer != null) {
+            // we have a re-connect scheduled, but stop right now.
+            this.vertx.cancelTimer(this.reconnectTimer);
+        }
+
+        switch (this.state) {
+            case CONNECTED:
+                closeConnection((Throwable) null);
+                break;
+            case DISCONNECTED:
+                // nothing to do
+                break;
+            case DISCONNECTING:
+                // nothing do do
+                break;
+            case CONNECTING:
+                // we do nothing here and wait, until the connection advanced, which will then trigger a disconnect
+                break;
+        }
+    }
+
+    @Override
+    public MqttClientSession exceptionHandler(Handler<Throwable> exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession sessionStateHandler(Handler<SessionEvent> sessionStateHandler) {
+        this.sessionStateHandler = sessionStateHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession subscriptionStateHandler(Handler<SubscriptionEvent> subscriptionStateHandler) {
+        this.subscriptionStateHandler = subscriptionStateHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession publishCompletionHandler(Handler<Integer> publishCompleteHandler) {
+        this.publishCompleteHandler = publishCompleteHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession publishCompletionExpirationHandler(Handler<Integer> publishCompletionExpirationHandler) {
+        this.publishCompletionExpirationHandler = publishCompletionExpirationHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession publishCompletionUnknownPacketIdHandler(Handler<Integer> publishCompletionUnknownPacketIdHandler) {
+        this.publishCompletionUnknownPacketIdHandler = publishCompletionUnknownPacketIdHandler;
+        return this;
+    }
+
+    @Override
+    public MqttClientSession messageHandler(Handler<MqttPublishMessage> messageHandler) {
+        this.messageHandler = messageHandler;
+        return this;
+    }
+
+    /**
+     * Set the state of the session.
+     *
+     * @param sessionState The new state.
+     * @param cause The optional cause, in case of an error.
+     */
+    private void setState(final SessionState sessionState, final Throwable cause) {
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("setState - current: %s, next: %s", this.state, sessionState), cause);
+        }
+
+        // before announcing our state change
+
+        switch (sessionState) {
+            case CONNECTING:
+                break;
+            case CONNECTED:
+                // successful connection, reset delay
+                this.reconnectDelay.reset();
+                break;
+            case DISCONNECTING:
+                break;
+            case DISCONNECTED:
+                this.pendingUnsubscribes.clear();
+                this.pendingSubscribes.clear();
+                for (String topic : this.subscriptions.keySet()) {
+                    notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+                }
+                break;
+        }
+
+        // announce state change
+
+        if (this.state != sessionState) {
+            this.state = sessionState;
+            Handler<SessionEvent> handler = this.sessionStateHandler;
+            if (handler != null) {
+                handler.handle(new SessionEventImpl(sessionState, cause));
+            }
+        }
+
+        // after announcing out state change
+
+        switch (this.state) {
+            case CONNECTING:
+                // we just wait for the outcome
+                break;
+            case CONNECTED:
+                if (!this.running) {
+                    closeConnection((Throwable) null);
+                }
+                break;
+            case DISCONNECTING:
+                // we just wait for the outcome
+                break;
+            case DISCONNECTED:
+                if (this.running) {
+                    scheduleReconnect();
+                }
+                break;
+        }
+    }
+
+    private void notifySubscriptionState(final String topic, final SubscriptionState state, final Integer grantedQoS) {
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("setSubscriptionState - topic: %s, state: %s, grantedQoS: %s", topic, state, grantedQoS));
+        }
+
+        this.subscriptionStates.put(topic, state);
+
+        Handler<SubscriptionEvent> handler = this.subscriptionStateHandler;
+        if (handler != null) {
+            handler.handle(new SubscriptionEventImpl(topic, state, grantedQoS));
+        }
+
+    }
+
+    private void scheduleReconnect() {
+        log.debug("Scheduling reconnect");
+
+        if (this.reconnectTimer == null) {
+
+            final Duration delay = nextDelay();
+            if (log.isDebugEnabled()) {
+                log.debug("Next delay: " + delay);
+            }
+
+            final long timer = vertx.setTimer(delay.toMillis(), x -> createConnection());
+            if (log.isDebugEnabled()) {
+                log.debug("Timer set: " + timer);
+            }
+
+            this.reconnectTimer = timer;
+        }
+    }
+
+    /**
+     * Calculate the next delay before trying to re-connect.
+     *
+     * @return The duration to wait.
+     */
+    private Duration nextDelay() {
+        return this.reconnectDelay.nextDelay();
+    }
+
+    /**
+     * Initiates the connection.
+     */
+    private void createConnection() {
+        log.debug("Creating connection");
+
+        // clear reconnect timer
+        this.reconnectTimer = null;
+
+        // create client
+        this.client = MqttClient.create(this.vertx, this.options);
+        this.client.exceptionHandler(this::exceptionCaught);
+        this.client.closeHandler(x -> connectionClosed());
+        this.client.publishHandler(this::serverPublished);
+        this.client.subscribeCompletionHandler(this::subscribeCompleted);
+        this.client.unsubscribeCompletionHandler(this::unsubscribeCompleted);
+        this.client.publishCompletionHandler(this::publishComplete);
+        this.client.publishCompletionExpirationHandler(this::publishExpired);
+        this.client.publishCompletionUnknownPacketIdHandler(this::publishCompletionUnknown);
+
+        // change state
+        setState(SessionState.CONNECTING, null);
+        // start connection
+        this.client
+            .connect(
+                this.options.getPort(),
+                this.options.getHostname(),
+                this.options.getServerName()
+                    .orElse(this.options.getHostname()),
+                this::connectCompleted);
+    }
+
+    /**
+     * Handle a caught exception.
+     */
+    private void exceptionCaught(Throwable cause) {
+        log.debug("Caught exception", cause);
+        closeConnection(cause);
+        Handler<Throwable> exceptionHandler = this.exceptionHandler;
+        if (exceptionHandler != null) {
+            exceptionHandler.handle(cause);
+        }
+    }
+
+    /**
+     * Initiates the connection shutdown.
+     */
+    private void closeConnection(Throwable cause) {
+        log.debug("Closing connection", cause);
+
+        setState(SessionState.DISCONNECTING, cause);
+        this.client.disconnect().onComplete(this::disconnectCompleted);
+    }
+
+    /**
+     * Gets called when the connect call was processed.
+     *
+     * @param result The outcome of the connect call.
+     */
+    private void connectCompleted(AsyncResult<MqttConnAckMessage> result) {
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Connect completed - result: %s, cause: %s", result.result(), result.cause()));
+        }
+
+        if (result.failed() || result.result() == null) {
+            // this will include CONACKs with error codes
+            setState(SessionState.DISCONNECTED, result.cause());
+            return;
+        }
+
+        MqttConnAckMessage ack = result.result();
+
+        setState(SessionState.CONNECTED, null);
+
+        if (!this.subscriptions.isEmpty() && (options.isCleanSession() || !ack.isSessionPresent())) {
+            // re-subscribe if we have requested subscriptions and (either cleanSession=true or no session found on the server)
+            requestSubscribe(new LinkedHashMap<>(this.subscriptions));
+        }
+    }
+
+    /**
+     * Gets called when the disconnect call was processed.
+     *
+     * @param result The outcome of the disconnect call.
+     */
+    private void disconnectCompleted(AsyncResult<?> result) {
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Disconnect completed - result: %s, cause: %s", result.result(), result.cause()));
+        }
+
+        connectionClosed(result.cause());
+    }
+
+    /**
+     * Gets called internally when the only reasonable action is to just disconnect.
+     * <p>
+     * If the session is still running, then it will trigger a re-connect.
+     *
+     * @param reason The reason message.
+     */
+    private void closeConnection(final String reason) {
+        closeConnection(new VertxException(reason).fillInStackTrace());
+    }
+
+    /**
+     * Gets called when the connection just dropped.
+     */
+    private void connectionClosed() {
+        if (this.state != SessionState.DISCONNECTING) {
+            // this came unexpected
+            connectionClosed(new VertxException("Connection closed"));
+        }
+    }
+
+    /**
+     * Called to clean up the after a connection was closed.
+     *
+     * @param cause The cause of the connection closure.
+     */
+    private void connectionClosed(final Throwable cause) {
+        if (log.isDebugEnabled()) {
+            log.debug("Connection closed", cause);
+        } else {
+            log.info("Connection closed: " + (cause != null ? cause.getMessage() : "<unknown>"));
+        }
+
+        if (this.client != null) {
+            this.client.exceptionHandler(null);
+            this.client.publishHandler(null);
+            this.client.closeHandler(null);
+            this.client.subscribeCompletionHandler(null);
+            this.client.publishCompletionHandler(null);
+            this.client.publishCompletionExpirationHandler(null);
+            this.client.publishCompletionUnknownPacketIdHandler(null);
+            this.client = null;
+        }
+        setState(SessionState.DISCONNECTED, cause);
+    }
+
+    /**
+     * Gets called when the server published a message for us.
+     *
+     * @param message The published message.
+     */
+    private void serverPublished(MqttPublishMessage message) {
+        if (log.isDebugEnabled()) {
+            log.debug("Server published: " + message);
+        }
+
+        Handler<MqttPublishMessage> publishHandler = this.messageHandler;
+        if (publishHandler != null) {
+            publishHandler.handle(message);
+        }
+    }
+
+    /**
+     * Perform subscribing.
+     *
+     * @param topics The topics to subscribe to.
+     */
+    private void doSubscribe(Map<String, RequestedQoS> topics) {
+        final LinkedHashMap<String, RequestedQoS> subscriptions = new LinkedHashMap<>(topics.size());
+
+        for (Map.Entry<String, RequestedQoS> entry : topics.entrySet()) {
+            this.subscriptions.compute(entry.getKey(), (key, current) -> {
+                if (current != entry.getValue()) {
+                    subscriptions.put(entry.getKey(), entry.getValue());
+                }
+                return entry.getValue();
+            });
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Requesting subscribe: " + subscriptions);
+        }
+        requestSubscribe(subscriptions);
+    }
+
+    /**
+     * Perform unsubscribing.
+     *
+     * @param topics The topics to unsubscribe from.
+     */
+    private void doUnsubscribe(Set<String> topics) {
+        final List<String> topicsToSend = new ArrayList<>(topics.size());
+        for (String topic : topics) {
+            if (this.subscriptions.remove(topic) != null) {
+                topicsToSend.add(topic);
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Requesting unsubscribe: " + topicsToSend);
+        }
+        requestUnsubscribe(topicsToSend);
+    }
+
+    /**
+     * Request to subscribe from the server.
+     *
+     * @param topics The topics to subscribe to, including the requested QoS.
+     */
+    private void requestSubscribe(LinkedHashMap<String, RequestedQoS> topics) {
+        if (topics.isEmpty() || this.client == null || !this.client.isConnected()) {
+            // nothing to do
+            return;
+        }
+
+        this.client
+            .subscribe(topics.entrySet()
+                .stream().collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> e.getValue().toInteger())))
+            .onComplete(result -> subscribeSent(result, topics));
+    }
+
+    /**
+     * Request to unsubscribe from the server.
+     *
+     * @param topics The topic to unsubscribe from.
+     */
+    private void requestUnsubscribe(List<String> topics) {
+        if (topics.isEmpty() || this.client == null || !this.client.isConnected()) {
+            // nothing to do
+            return;
+        }
+
+        for (String topic : topics) {
+            // vertx-mqtt currently does not support unsubscribing from multi-topics due to an API limitation
+            this.client
+                .unsubscribe(topic)
+                .onComplete(result -> unsubscribeSent(result, Collections.singletonList(topic)));
+        }
+    }
+
+    /**
+     * Called when the subscribe call was sent.
+     *
+     * @param result The result of sending the request, contains the packet id.
+     */
+    private void subscribeSent(AsyncResult<Integer> result, LinkedHashMap<String, RequestedQoS> topics) {
+        if (result.failed() || result.result() == null) {
+            // failed
+            for (String topic : topics.keySet()) {
+                notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+            }
+        } else {
+            // record request
+            for (String topic : topics.keySet()) {
+                notifySubscriptionState(topic, SubscriptionState.SUBSCRIBING, null);
+            }
+            this.pendingSubscribes.put(result.result(), topics);
+        }
+    }
+
+    /**
+     * Called when the unsubscribe call was sent.
+     *
+     * @param result The result of sending the request, contains the packet id.
+     */
+    private void unsubscribeSent(AsyncResult<Integer> result, List<String> topics) {
+        if (result.failed() || result.result() == null) {
+            closeConnection(String.format("Failed to send unsubscribe request: %s", result.cause()));
+        } else {
+            this.pendingUnsubscribes.put(result.result(), topics);
+        }
+    }
+
+    /**
+     * Called when the server processed the request to subscribe.
+     *
+     * @param ack The acknowledge message.
+     */
+    private void subscribeCompleted(MqttSubAckMessage ack) {
+        LinkedHashMap<String, RequestedQoS> request = this.pendingSubscribes.remove(ack.messageId());
+        if (request == null) {
+            closeConnection(String.format("Unexpected subscription ack response - messageId: %s", ack.messageId()));
+            return;
+        }
+        if (request.size() != ack.grantedQoSLevels().size()) {
+            closeConnection(String.format("Mismatch of topics on subscription ack - expected: %d, actual: %d", request.size(),
+                ack.grantedQoSLevels().size()));
+            return;
+        }
+
+        int idx = 0;
+        for (String topic : request.keySet()) {
+            Integer grantedQoS = ack.grantedQoSLevels().get(idx);
+            notifySubscriptionState(topic, SubscriptionState.SUBSCRIBED, grantedQoS);
+            idx += 1;
+        }
+    }
+
+    /**
+     * Called when the server processed the request to unsubscribe.
+     *
+     * @param messageId The ID of the message that completed.
+     */
+    private void unsubscribeCompleted(Integer messageId) {
+        List<String> request = this.pendingUnsubscribes.remove(messageId);
+        for (String topic : request) {
+            notifySubscriptionState(topic, SubscriptionState.UNSUBSCRIBED, null);
+        }
+    }
+
+    @Override
+    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
+        Promise<Integer> future = Promise.promise();
+        this.vertx
+            .runOnContext(x -> doPublish(topic, payload, qosLevel, isDup, isRetain)
+                .onComplete(future));
+        return future.future();
+    }
+
+    private Future<Integer> doPublish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
+        if (this.client != null && this.client.isConnected()) {
+            // not checking for isConnected might throw a NPE from inside the client
+            return this.client.publish(topic, payload, qosLevel, isDup, isRetain);
+        } else {
+            return Future.failedFuture("Session is not connected");
+        }
+    }
+
+    private void publishComplete(Integer messageId) {
+        Handler<Integer> handler = this.publishCompleteHandler;
+        if (handler != null) {
+            handler.handle(messageId);
+        }
+    }
+
+    private void publishExpired(Integer messageId) {
+        Handler<Integer> handler = this.publishCompletionExpirationHandler;
+        if (handler != null) {
+            handler.handle(messageId);
+        }
+    }
+
+    private void publishCompletionUnknown(Integer messageId) {
+        Handler<Integer> handler = this.publishCompletionUnknownPacketIdHandler;
+        if (handler != null) {
+            handler.handle(messageId);
+        }
+    }
+
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java
new file mode 100644
index 0000000000..41dfc6ed50
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SessionEventImpl.java
@@ -0,0 +1,38 @@
+package io.smallrye.reactive.messaging.mqtt.session.impl;
+
+import io.smallrye.reactive.messaging.mqtt.session.SessionEvent;
+import io.smallrye.reactive.messaging.mqtt.session.SessionState;
+
+/**
+ * An event of a session state change.
+ */
+public class SessionEventImpl implements SessionEvent {
+
+    private final SessionState sessionState;
+    private final Throwable cause;
+
+    public SessionEventImpl(final SessionState sessionState, final Throwable reason) {
+        this.sessionState = sessionState;
+        this.cause = reason;
+    }
+
+    /**
+     * The new state of the session.
+     *
+     * @return The state.
+     */
+    @Override
+    public SessionState getSessionState() {
+        return this.sessionState;
+    }
+
+    /**
+     * The (optional) cause of change.
+     *
+     * @return The throwable that causes the state change, or {@code null}, if there was none.
+     */
+    @Override
+    public Throwable getCause() {
+        return this.cause;
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java
new file mode 100644
index 0000000000..e3b8ba09d1
--- /dev/null
+++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/session/impl/SubscriptionEventImpl.java
@@ -0,0 +1,77 @@
+package io.smallrye.reactive.messaging.mqtt.session.impl;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionEvent;
+import io.smallrye.reactive.messaging.mqtt.session.SubscriptionState;
+
+/**
+ * An event of a subscription state change.
+ */
+public class SubscriptionEventImpl implements SubscriptionEvent {
+    private final String topic;
+    private final SubscriptionState subscriptionState;
+    private final Integer qos;
+
+    public SubscriptionEventImpl(final String topic, final SubscriptionState subscriptionState, final Integer qos) {
+        this.topic = topic;
+        this.subscriptionState = subscriptionState;
+        this.qos = qos;
+    }
+
+    /**
+     * The granted QoS level from the server.
+     *
+     * @return When the state changed to {@link SubscriptionState#SUBSCRIBED}, it contains the QoS level granted by
+     *         the server. Otherwise it will be {@code null}.
+     */
+    @Override
+    public Integer getQos() {
+        return this.qos;
+    }
+
+    /**
+     * The new subscription state.
+     *
+     * @return The state.
+     */
+    @Override
+    public SubscriptionState getSubscriptionState() {
+        return this.subscriptionState;
+    }
+
+    /**
+     * The name of the topic this change refers to.
+     *
+     * @return The topic name.
+     */
+    @Override
+    public String getTopic() {
+        return this.topic;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        SubscriptionEventImpl that = (SubscriptionEventImpl) o;
+        return topic.equals(that.topic) && subscriptionState == that.subscriptionState && Objects.equals(qos, that.qos);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, subscriptionState, qos);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", SubscriptionEventImpl.class.getSimpleName() + "[", "]")
+                .add("topic='" + topic + "'")
+                .add("subscriptionState=" + subscriptionState)
+                .add("qos=" + qos)
+                .toString();
+    }
+}
diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java
index 713d72930f..3b40abe71b 100644
--- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java
+++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttSourceTest.java
@@ -43,7 +43,7 @@ public void testSource() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,
                 counter::getAndIncrement)).start();
@@ -69,7 +69,7 @@ public void testSourceUsingChannelName() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,
                 counter::getAndIncrement)).start();
@@ -101,7 +101,7 @@ public void testBroadcast() {
         stream.forEach(messages1::add).run();
         stream.forEach(messages2::add).run();
 
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
 
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,
@@ -142,7 +142,7 @@ public void testWithVeryLargeMessage() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         new Thread(() -> usage.produce(topic, 10, null,
                 () -> large)).start();
 
diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java
index f3125f252e..f3774ca6f9 100644
--- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java
+++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MutualTlsMqttSourceTest.java
@@ -52,7 +52,7 @@ public void testMutualTLS() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         pause();
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,
diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java
index 3af0a1cb17..b9300fc46f 100644
--- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java
+++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/SecureMqttSourceTest.java
@@ -45,7 +45,7 @@ public void testSecureSource() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         pause();
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,
diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java
index cbeaf564cd..84a835f578 100644
--- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java
+++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/TlsMqttSourceTest.java
@@ -48,7 +48,7 @@ public void testTLS() {
         List<MqttMessage<?>> messages = new ArrayList<>();
         PublisherBuilder<MqttMessage<?>> stream = source.getSource();
         stream.forEach(messages::add).run();
-        await().until(source::isSubscribed);
+        await().until(source::isReady);
         pause();
         AtomicInteger counter = new AtomicInteger();
         new Thread(() -> usage.produceIntegers(topic, 10, null,