Skip to content

Commit dca716e

Browse files
authored
Fix pubsub when cache enabled (redis#4086)
* fix pubsub when cache enabled * polish
1 parent 06ca054 commit dca716e

File tree

2 files changed

+97
-10
lines changed

2 files changed

+97
-10
lines changed

src/main/java/redis/clients/jedis/Protocol.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -223,34 +223,39 @@ public static Object read(final RedisInputStream is) {
223223

224224
@Experimental
225225
public static Object read(final RedisInputStream is, final Cache cache) {
226-
readPushes(is, cache, false);
227-
return process(is);
226+
Object unhandledPush = readPushes(is, cache, false);
227+
return unhandledPush == null ? process(is) : unhandledPush;
228228
}
229229

230230
@Experimental
231-
public static void readPushes(final RedisInputStream is, final Cache cache, boolean onlyPendingBuffer) {
231+
public static Object readPushes(final RedisInputStream is, final Cache cache,
232+
boolean onlyPendingBuffer) {
233+
Object unhandledPush = null;
232234
if (onlyPendingBuffer) {
233235
try {
234-
while (is.available() > 0 && is.peek(GREATER_THAN_BYTE)) {
235-
is.readByte();
236-
processPush(is, cache);
236+
while (unhandledPush == null && is.available() > 0 && is.peek(GREATER_THAN_BYTE)) {
237+
unhandledPush = processPush(is, cache);
237238
}
238239
} catch (IOException e) {
239240
throw new JedisConnectionException("Failed to read pending buffer for push messages!", e);
240241
}
241242
} else {
242-
while (is.peek(GREATER_THAN_BYTE)) {
243-
is.readByte();
244-
processPush(is, cache);
243+
while (unhandledPush == null && is.peek(GREATER_THAN_BYTE)) {
244+
unhandledPush = processPush(is, cache);
245245
}
246246
}
247+
return unhandledPush;
247248
}
248249

249-
private static void processPush(final RedisInputStream is, Cache cache) {
250+
private static Object processPush(final RedisInputStream is, Cache cache) {
251+
is.readByte();
250252
List<Object> list = processMultiBulkReply(is);
251253
if (list.size() == 2 && list.get(0) instanceof byte[]
252254
&& Arrays.equals(INVALIDATE_BYTES, (byte[]) list.get(0))) {
253255
cache.deleteByRedisKeys((List) list.get(1));
256+
return null;
257+
} else {
258+
return list;
254259
}
255260
}
256261

src/test/java/redis/clients/jedis/csc/UnifiedJedisClientSideCacheTestBase.java

+82
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,21 @@
33
import static org.junit.Assert.assertEquals;
44
import static org.junit.Assert.assertNotSame;
55
import static org.junit.Assert.assertNull;
6+
import static org.junit.Assert.assertTrue;
67
import static org.awaitility.Awaitility.await;
78

9+
import java.util.ArrayList;
810
import java.util.Arrays;
911
import java.util.List;
12+
import java.util.concurrent.Executors;
13+
import java.util.concurrent.ScheduledExecutorService;
1014
import java.util.concurrent.TimeUnit;
1115

1216
import org.junit.After;
1317
import org.junit.Before;
1418
import org.junit.Test;
1519

20+
import redis.clients.jedis.JedisPubSub;
1621
import redis.clients.jedis.UnifiedJedis;
1722

1823
public abstract class UnifiedJedisClientSideCacheTestBase {
@@ -222,4 +227,81 @@ public void invalidationOnCacheHitTest() {
222227
}
223228
}
224229

230+
@Test
231+
public void simplePubsubWithClientCache() {
232+
String test_channel = "test_channel";
233+
String test_message = "test message";
234+
235+
UnifiedJedis publisher = createCachedJedis(CacheConfig.builder().build());
236+
Runnable command = () -> publisher.publish(test_channel, test_message + System.currentTimeMillis());
237+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
238+
executor.scheduleAtFixedRate(command, 0, 100, java.util.concurrent.TimeUnit.MILLISECONDS);
239+
240+
List<String> receivedMessages = new ArrayList<>();
241+
try (UnifiedJedis subscriber = createCachedJedis(CacheConfig.builder().build())) {
242+
JedisPubSub jedisPubSub = new JedisPubSub() {
243+
private int count = 0;
244+
245+
@Override
246+
public void onMessage(String channel, String message) {
247+
receivedMessages.add(message);
248+
if (message.startsWith(test_message) && count++ > 1) {
249+
this.unsubscribe(test_channel);
250+
}
251+
}
252+
};
253+
subscriber.subscribe(jedisPubSub, test_channel);
254+
}
255+
256+
executor.shutdown();
257+
publisher.close();
258+
259+
assertTrue(receivedMessages.size() > 1);
260+
receivedMessages.forEach(message -> assertTrue(message.startsWith(test_message)));
261+
}
262+
263+
@Test
264+
public void advancedPubsubWithClientCache() {
265+
String test_channel = "test_channel";
266+
String test_message = "test message";
267+
String test_key = "test_key";
268+
String test_value = "test_value";
269+
270+
UnifiedJedis publisher = createCachedJedis(CacheConfig.builder().build());
271+
Runnable command = () -> publisher.publish(test_channel, test_message + System.currentTimeMillis());
272+
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
273+
executor.scheduleAtFixedRate(command, 0, 50, java.util.concurrent.TimeUnit.MILLISECONDS);
274+
275+
int iteration = 0;
276+
int totalIteration = 10;
277+
while (iteration++ < totalIteration) {
278+
279+
List<String> receivedMessages = new ArrayList<>();
280+
try (UnifiedJedis subscriber = createCachedJedis(CacheConfig.builder().build())) {
281+
282+
subscriber.set(test_key, test_value);
283+
assertEquals(test_value, subscriber.get(test_key));
284+
JedisPubSub jedisPubSub = new JedisPubSub() {
285+
private int count = 0;
286+
287+
@Override
288+
public void onMessage(String channel, String message) {
289+
receivedMessages.add(message);
290+
if (message.startsWith(test_message) && count++ > 1) {
291+
this.unsubscribe(test_channel);
292+
}
293+
}
294+
};
295+
subscriber.subscribe(jedisPubSub, test_channel);
296+
subscriber.set(test_key, test_value);
297+
assertEquals(test_value, subscriber.get(test_key));
298+
}
299+
300+
assertTrue(receivedMessages.size() > 1);
301+
receivedMessages.forEach(message -> assertTrue(message.startsWith(test_message)));
302+
}
303+
304+
executor.shutdown();
305+
publisher.close();
306+
}
225307
}

0 commit comments

Comments
 (0)