Skip to content

Commit 46f10a8

Browse files
committed
Add Mqtt5SubscribeOnReconnectIT
1 parent 82cabaa commit 46f10a8

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2018-present HiveMQ and the HiveMQ Community
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hivemq.client.lifecycle;
18+
19+
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
20+
import com.hivemq.client.mqtt.datatypes.MqttQos;
21+
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
22+
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
23+
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
24+
import com.hivemq.testcontainer.junit5.HiveMQTestContainerExtension;
25+
import org.jetbrains.annotations.NotNull;
26+
import org.junit.jupiter.api.AfterEach;
27+
import org.junit.jupiter.api.Test;
28+
import org.junit.jupiter.api.Timeout;
29+
import org.junit.jupiter.api.extension.RegisterExtension;
30+
import org.testcontainers.utility.MountableFile;
31+
32+
import java.util.concurrent.ConcurrentLinkedQueue;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
36+
import static org.awaitility.Awaitility.await;
37+
38+
public class Mqtt5SubscribeOnReconnectIT {
39+
40+
@RegisterExtension
41+
public final @NotNull HiveMQTestContainerExtension hivemq =
42+
new HiveMQTestContainerExtension().withHiveMQConfig(MountableFile.forClasspathResource("/config.xml"));
43+
44+
private final @NotNull AtomicBoolean reconnect = new AtomicBoolean(true);
45+
46+
@AfterEach
47+
void tearDown() {
48+
reconnect.set(false);
49+
}
50+
51+
@Test
52+
@Timeout(60)
53+
void mqtt5_resubscribe_on_reconnect() throws Exception {
54+
final Mqtt5BlockingClient publisher =
55+
Mqtt5Client.builder().identifier("PublishClient").serverPort(hivemq.getMqttPort()).buildBlocking();
56+
publisher.connect();
57+
58+
final ConcurrentLinkedQueue<Mqtt5Publish> publishes = new ConcurrentLinkedQueue<>();
59+
final CountDownLatch latch = new CountDownLatch(1);
60+
final Mqtt5BlockingClient subscriber = Mqtt5Client.builder()
61+
.identifier("SubscribeClient")
62+
.serverPort(hivemq.getMqttPort())
63+
.addConnectedListener(context -> System.out.println("Subscriber connected"))
64+
.addDisconnectedListener(context -> {
65+
System.out.println("Subscriber disconnected");
66+
context.getReconnector().resubscribeIfSessionPresent(true);
67+
if (reconnect.get()) {
68+
System.out.println("Reconnect subscriber");
69+
context.getReconnector().reconnect(true);
70+
latch.countDown();
71+
}
72+
})
73+
.buildBlocking();
74+
75+
subscriber.toAsync().publishes(MqttGlobalPublishFilter.ALL, publishes::add);
76+
subscriber.connectWith().cleanStart(false).sessionExpiryInterval(3600).send();
77+
subscriber.subscribeWith().topicFilter("#").qos(MqttQos.AT_LEAST_ONCE).send();
78+
79+
for (int i = 0; i < 10; i++) {
80+
publisher.toAsync().publishWith().topic("test").qos(MqttQos.AT_LEAST_ONCE).send();
81+
}
82+
await().until(() -> {
83+
System.out.println(publishes.size());
84+
return publishes.size() == 10;
85+
});
86+
87+
subscriber.disconnect();
88+
latch.await();
89+
for (int i = 0; i < 10; i++) {
90+
publisher.toAsync().publishWith().topic("test").qos(MqttQos.AT_LEAST_ONCE).send();
91+
}
92+
await().until(() -> publishes.size() == 20);
93+
}
94+
}

0 commit comments

Comments
 (0)