Skip to content

Commit 555fa6b

Browse files
author
Brian Hammond
committed
pmessage support from Redis 1.3.10 c8d0ea0ef1df7b64a23f992f370db5f70f343891
1 parent 7138e80 commit 555fa6b

File tree

2 files changed

+42
-56
lines changed

2 files changed

+42
-56
lines changed

examples/subscriber.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ var
1010
sys.puts("waiting for messages...");
1111

1212
client.subscribeTo("*",
13-
function (channel, message) {
14-
sys.puts("[" + channel + "]: " + message);
13+
function (channel, message, subscriptionPattern) {
14+
var output = "[" + channel;
15+
if (subscriptionPattern)
16+
output += " (from pattern '" + subscriptionPattern + "')";
17+
output += "]: " + message;
18+
sys.puts(output);
1519
});

lib/redis-client.js

+36-54
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,6 @@ function debugFilter(buffer, len) {
7272
return filtered;
7373
}
7474

75-
// fnmatch mirrors (mostly) the functionality of fnmatch(3) at least
76-
// in the same way as Redis.
77-
78-
var qmarkRE = /\?/g;
79-
var starRE = /\*/g;
80-
var dotRE = /\./g;
81-
82-
function fnmatch (pattern, test) {
83-
var newPattern = pattern.replace(dotRE, '(\\.)')
84-
.replace(qmarkRE, '(.)')
85-
.replace(starRE, '(.*?)');
86-
return (new RegExp(newPattern)).test(test);
87-
}
88-
8975
// A fully interruptable, binary-safe Redis reply parser.
9076
// 'callback' is called with each reply parsed in 'feed'.
9177
// 'thisArg' is the "thisArg" for the callback "call".
@@ -409,14 +395,26 @@ Client.prototype.onReply_ = function (reply) {
409395
};
410396

411397
Client.prototype.handlePublishedMessage_ = function (reply) {
412-
// We're looking for a multibulk like:
413-
// ["message", "channelName", messageBuffer]
414-
415-
if (reply.type != MULTIBULK ||
416-
!(reply.value instanceof Array) ||
417-
reply.value.length != 3 ||
418-
reply.value[0].value.length != 7 ||
419-
reply.value[0].value.asciiSlice(0, 7) != 'message')
398+
// We're looking for a multibulk resembling
399+
// ["message", "channelName", messageBuffer]; or
400+
// ["pmessage", "matchingPattern", "channelName", messageBuffer]
401+
// The latter is sent when the client subscribed to a channel by a pattern;
402+
// the former when subscribed to a channel by name.
403+
// If the client subscribes by name -and- by pattern and there's some
404+
// overlap, the client -will- receive multiple p/message notifications.
405+
406+
if (reply.type != MULTIBULK || !(reply.value instanceof Array))
407+
return false;
408+
409+
var isMessage = (reply.value.length == 3 &&
410+
reply.value[0].value.length == 7 &&
411+
reply.value[0].value.asciiSlice(0, 7) == 'message');
412+
413+
var isPMessage = (reply.value.length == 4 &&
414+
reply.value[0].value.length == 8 &&
415+
reply.value[0].value.asciiSlice(0, 8) == 'pmessage');
416+
417+
if (!isMessage && !isPMessage)
420418
return false;
421419

422420
// This is tricky. We are returning true even though there
@@ -430,39 +428,23 @@ Client.prototype.handlePublishedMessage_ = function (reply) {
430428
if (Object.getOwnPropertyNames(this.channelCallbacks).length == 0)
431429
return true;
432430

433-
var channelNameOrPattern = reply.value[1].value;
434-
var channelCallback = this.channelCallbacks[channelNameOrPattern];
435-
if (typeof channelCallback == 'undefined') {
436-
// No 1:1 channel name match.
437-
//
438-
// Perhaps the subscription was for a pattern (PSUBSCRIBE)?
439-
// Redis does not send the pattern that matched from an
440-
// original PSUBSCRIBE request. It sends the (fn)matching
441-
// channel name instead. Thus, let's try to fnmatch the
442-
// channel the message was published to/on to a subscribed
443-
// pattern, and callback the associated function.
444-
//
445-
// A -> Redis PSUBSCRIBE foo.*
446-
// B -> Redis PUBLISH foo.bar hello
447-
// Redis -> A MESSAGE foo.bar hello (no pattern specified)
448-
449-
var channelNamesOrPatterns =
450-
Object.getOwnPropertyNames(this.channelCallbacks);
451-
452-
for (var i=0; i < channelNamesOrPatterns.length; ++i) {
453-
var thisNameOrPattern = channelNamesOrPatterns[i];
454-
if (fnmatch(thisNameOrPattern, channelNameOrPattern)) {
455-
channelCallback = this.channelCallbacks[thisNameOrPattern];
456-
break;
457-
}
458-
}
431+
var channelName, channelPattern, channelCallback, payload;
432+
433+
if (isMessage) {
434+
channelName = reply.value[1].value;
435+
channelCallback = this.channelCallbacks[channelName];
436+
payload = reply.value[2].value;
437+
} else if (isPMessage) {
438+
channelPattern = reply.value[1].value;
439+
channelName = reply.value[2].value;
440+
channelCallback = this.channelCallbacks[channelPattern];
441+
payload = reply.value[3].value;
442+
} else {
443+
return false;
459444
}
460445

461-
if (typeof(channelCallback) === 'function') {
462-
// Good, we found a function to callback.
463-
464-
var payload = reply.value[2].value;
465-
channelCallback(channelNameOrPattern, payload);
446+
if (typeof channelCallback == "function") {
447+
channelCallback(channelName, payload, channelPattern);
466448
return true;
467449
}
468450

@@ -871,7 +853,7 @@ Client.prototype.maybeReconnect = function () {
871853
// issue other commands, use a second client instance.
872854

873855
Client.prototype.subscribeTo = function (nameOrPattern, callback) {
874-
if (typeof this.channelCallbacks[nameOrPattern] === 'function')
856+
if (typeof this.channelCallbacks[nameOrPattern] === 'function')
875857
return;
876858

877859
if (typeof(callback) !== 'function')

0 commit comments

Comments
 (0)