Skip to content

Commit 7051895

Browse files
committed
Add Mqtt5SubscribeOnReconnectIT
1 parent 7569bea commit 7051895

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2018-present HiveMQ and the HiveMQ Community
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.client.lifecycle;
18+
19+
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
20+
import com.hivemq.client.mqtt.datatypes.MqttQos;
21+
import com.hivemq.client.mqtt.lifecycle.MqttClientReconnector;
22+
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
23+
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
24+
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
25+
import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension;
26+
import org.jetbrains.annotations.NotNull;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.Timeout;
30+
import org.junit.jupiter.api.extension.RegisterExtension;
31+
import org.testcontainers.utility.MountableFile;
32+
33+
import java.util.concurrent.ConcurrentLinkedQueue;
34+
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
37+
import static org.awaitility.Awaitility.await;
38+
39+
public class Mqtt5SubscribeOnReconnectIT {
40+
41+
@RegisterExtension
42+
public final @NotNull HiveMQTestContainerExtension hivemq =
43+
new HiveMQTestContainerExtension().withHiveMQConfig(MountableFile.forClasspathResource("/config.xml"));
44+
45+
private final @NotNull AtomicBoolean reconnect = new AtomicBoolean(true);
46+
47+
@AfterEach
48+
void tearDown() {
49+
reconnect.set(false);
50+
}
51+
52+
@Test
53+
@Timeout(60)
54+
void mqtt5_resubscribe_on_reconnect() throws Exception {
55+
final Mqtt5BlockingClient publisher =
56+
Mqtt5Client.builder().identifier("PublishClient").serverPort(hivemq.getMqttPort()).buildBlocking();
57+
publisher.connect();
58+
59+
final ConcurrentLinkedQueue<Mqtt5Publish> publishes = new ConcurrentLinkedQueue<>();
60+
final CountDownLatch latch = new CountDownLatch(1);
61+
final Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
62+
.identifier("SubscribeClient")
63+
.serverPort(hivemq.getMqttPort())
64+
.addConnectedListener(context -> System.out.println("Subscriber connected"))
65+
.addDisconnectedListener(context -> {
66+
System.out.println("Subscriber disconnected");
67+
final MqttClientReconnector reconnector = context.getReconnector();
68+
reconnector.resubscribeIfSessionPresent(true);
69+
if (reconnect.get()) {
70+
System.out.println("Reconnect subscriber");
71+
reconnector.reconnect(true);
72+
latch.countDown();
73+
}
74+
})
75+
.buildBlocking();
76+
77+
subscriber.toAsync().publishes(MqttGlobalPublishFilter.ALL, publishes::add);
78+
subscriber.connectWith().cleanStart(false).sessionExpiryInterval(3600).send();
79+
subscriber.subscribeWith().topicFilter("#").qos(MqttQos.AT_LEAST_ONCE).send();
80+
81+
for (int i = 0; i < 10; i++) {
82+
publisher.toAsync().publishWith().topic("test").qos(MqttQos.AT_LEAST_ONCE).send();
83+
}
84+
await().until(() -> {
85+
System.out.println(publishes.size());
86+
return publishes.size() == 10;
87+
});
88+
89+
subscriber.disconnect();
90+
latch.await();
91+
for (int i = 0; i < 10; i++) {
92+
publisher.toAsync().publishWith().topic("test").qos(MqttQos.AT_LEAST_ONCE).send();
93+
}
94+
await().until(() -> publishes.size() == 20);
95+
}
96+
}

0 commit comments

Comments
 (0)