Skip to content

Commit a8cf433

Browse files
#142 Fixed multiple topic subscription errors
1 parent e71248f commit a8cf433

File tree

1 file changed

+10
-3
lines changed

1 file changed

+10
-3
lines changed

lib/src/main/java/ua/naiksoftware/stomp/StompClient.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -240,11 +240,12 @@ public Flowable<StompMessage> topic(@NonNull String destPath, List<StompHeader>
240240
return Flowable.error(new IllegalArgumentException("Topic path cannot be null"));
241241
else if (!streamMap.containsKey(destPath))
242242
streamMap.put(destPath,
243-
subscribePath(destPath, headerList).andThen(
243+
Completable.defer(() -> subscribePath(destPath, headerList)).andThen(
244244
getMessageStream()
245245
.filter(msg -> pathMatcher.matches(destPath, msg))
246246
.toFlowable(BackpressureStrategy.BUFFER)
247-
.share()).doFinally(() -> unsubscribePath(destPath).subscribe())
247+
.doFinally(() -> unsubscribePath(destPath).subscribe())
248+
.share())
248249
);
249250
return streamMap.get(destPath);
250251
}
@@ -267,14 +268,20 @@ private Completable subscribePath(String destinationPath, @Nullable List<StompHe
267268
headers.add(new StompHeader(StompHeader.ACK, DEFAULT_ACK));
268269
if (headerList != null) headers.addAll(headerList);
269270
return send(new StompMessage(StompCommand.SUBSCRIBE,
270-
headers, null));
271+
headers, null))
272+
.doOnError(throwable -> unsubscribePath(destinationPath).subscribe());
271273
}
272274

273275

274276
private Completable unsubscribePath(String dest) {
275277
streamMap.remove(dest);
276278

277279
String topicId = topics.get(dest);
280+
281+
if (topicId == null) {
282+
return Completable.complete();
283+
}
284+
278285
topics.remove(dest);
279286

280287
Log.d(TAG, "Unsubscribe path: " + dest + " id: " + topicId);

0 commit comments

Comments
 (0)