Skip to content

Commit 76324a5

Browse files
committed
Encode rmap string content in publish
So that we can ensure decoding all the parts in Go. This replaces the previous algo that was using : to separate key and values which was not working if a key or a value also contained :.
1 parent 23ebcc6 commit 76324a5

File tree

5 files changed

+68
-31
lines changed

5 files changed

+68
-31
lines changed

pool/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func TestSchedule(t *testing.T) {
9090
err = node.Schedule(ctx, producer, d)
9191
require.NoError(t, err)
9292

93-
assert.Eventually(t, func() bool { return it() == 7 }, max, delay, "schedule should have stopped")
93+
assert.Eventually(t, func() bool { return it() == 7 }, max, delay, "schedule should have stopped, got %d", it())
9494
select {
9595
case <-done:
9696
reset = true

pool/ticker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o
6060
}
6161
if t.next == "" {
6262
next := serialize(time.Now().Add(d), d)
63-
if _, err := t.tickerMap.Set(ctx, t.name, next); err != nil {
63+
if _, err := t.tickerMap.SetAndWait(ctx, t.name, next); err != nil {
6464
return nil, fmt.Errorf("failed to store tick and duration: %s", err)
6565
}
6666
t.next = next

pool/ticker_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@ import (
1515

1616
func TestNewTicker(t *testing.T) {
1717
rdb := ptesting.NewRedisClient(t)
18+
defer ptesting.CleanupRedis(t, rdb, false, t.Name())
1819
ctx := log.Context(ptesting.NewTestContext(t), log.WithOutput(io.Discard))
1920
testName := strings.Replace(t.Name(), "/", "_", -1)
2021
node := newTestNode(t, ctx, rdb, testName)
2122
tickDuration := 10 * time.Millisecond
2223

2324
// Create and test new ticker
2425
startTime := time.Now()
25-
ticker, err := node.NewTicker(ctx, testName, tickDuration)
26+
ticker, err := node.NewTicker(ctx, "ticker1", tickDuration)
2627
require.NoError(t, err, "Failed to create new ticker")
2728
require.NotNil(t, ticker, "Ticker should not be nil")
2829

@@ -52,6 +53,7 @@ func TestNewTicker(t *testing.T) {
5253

5354
func TestReplaceTickerTimer(t *testing.T) {
5455
rdb := ptesting.NewRedisClient(t)
56+
defer ptesting.CleanupRedis(t, rdb, true, t.Name())
5557
ctx := log.Context(ptesting.NewTestContext(t), log.WithOutput(io.Discard))
5658
testName := strings.Replace(t.Name(), "/", "_", -1)
5759
node := newTestNode(t, ctx, rdb, testName)

rmap/map.go

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rmap
22

33
import (
44
"context"
5+
"encoding/binary"
56
"fmt"
67
"math/rand"
78
"regexp"
@@ -25,7 +26,7 @@ type (
2526
hashkey string // Redis hash key
2627
msgch <-chan *redis.Message // channel to receive map updates
2728
chans []chan EventKind // channels to send notifications
28-
ichan chan struct{} // internal channel to send notifications
29+
ichan chan string // internal channel to send set notifications
2930
done chan struct{} // channel to signal shutdown
3031
wait sync.WaitGroup // wait for read goroutine to exit
3132
logger pulse.Logger // logger
@@ -85,7 +86,7 @@ func Join(ctx context.Context, name string, rdb *redis.Client, opts ...MapOption
8586
Name: name,
8687
chankey: fmt.Sprintf("map:%s:updates", name),
8788
hashkey: fmt.Sprintf("map:%s:content", name),
88-
ichan: make(chan struct{}, 1),
89+
ichan: make(chan string, 1),
8990
done: make(chan struct{}),
9091
logger: o.Logger.WithPrefix("map", name),
9192
rdb: rdb,
@@ -230,14 +231,11 @@ func (sm *Map) SetAndWait(ctx context.Context, key, value string) (string, error
230231
select {
231232
case <-ctx.Done():
232233
return "", ctx.Err()
233-
case _, ok := <-sm.ichan:
234+
case val, ok := <-sm.ichan:
234235
if !ok {
235236
return "", fmt.Errorf("pulse map: %s is stopped", sm.Name)
236237
}
237-
sm.lock.RLock()
238-
v, ok := sm.content[key]
239-
sm.lock.RUnlock()
240-
if ok && v == value {
238+
if val == value {
241239
return prev, nil
242240
}
243241
}
@@ -527,12 +525,12 @@ func (sm *Map) run() {
527525
sm.reconnect()
528526
continue
529527
}
530-
parts := strings.SplitN(msg.Payload, ":", 3)
531-
if len(parts) < 2 {
528+
parts := strings.SplitN(msg.Payload, ":", 2)
529+
if len(parts) != 2 {
532530
sm.logger.Error(fmt.Errorf("invalid payload"), "payload", msg.Payload)
533531
continue
534532
}
535-
op, key := parts[0], parts[1]
533+
op, data := parts[0], []byte(parts[1])
536534
sm.lock.Lock()
537535
kind := EventChange
538536
switch op {
@@ -541,21 +539,34 @@ func (sm *Map) run() {
541539
sm.logger.Debug("reset")
542540
kind = EventReset
543541
case "del":
542+
key, _, err := unpackString(data)
543+
if err != nil {
544+
sm.logger.Error(fmt.Errorf("invalid del payload"), "payload", msg.Payload, "error", err)
545+
sm.lock.Unlock()
546+
continue
547+
}
544548
delete(sm.content, key)
545549
sm.logger.Debug("deleted", "key", key)
546550
kind = EventDelete
547551
case "set":
548-
if len(parts) != 3 {
549-
sm.logger.Error(fmt.Errorf("invalid set payload"), "payload", msg.Payload)
552+
key, rest, err := unpackString(data)
553+
if err != nil {
554+
sm.logger.Error(fmt.Errorf("invalid set key"), "payload", msg.Payload, "error", err)
550555
sm.lock.Unlock()
551556
continue
552557
}
553-
sm.content[key] = parts[2]
554-
sm.logger.Debug("set", "key", key, "val", parts[2])
555-
}
556-
select {
557-
case sm.ichan <- struct{}{}:
558-
default:
558+
val, _, err := unpackString(rest)
559+
if err != nil {
560+
sm.logger.Error(fmt.Errorf("invalid set value"), "payload", msg.Payload, "error", err)
561+
sm.lock.Unlock()
562+
continue
563+
}
564+
sm.content[key] = val
565+
select {
566+
case sm.ichan <- val:
567+
default:
568+
}
569+
sm.logger.Debug("set", "key", key, "val", val)
559570
}
560571
for _, c := range sm.chans {
561572
select {
@@ -639,3 +650,17 @@ var redisKeyRegex = regexp.MustCompile(`^[^ \0\*\?\[\]]{1,512}$`)
639650
func isValidRedisKeyName(key string) bool {
640651
return redisKeyRegex.MatchString(key)
641652
}
653+
654+
// unpackString reads a length-prefixed string from a buffer using struct.pack
655+
// format "ic0"
656+
func unpackString(data []byte) (string, []byte, error) {
657+
if len(data) < 4 {
658+
return "", nil, fmt.Errorf("buffer too short for length")
659+
}
660+
length := int(binary.LittleEndian.Uint32(data))
661+
data = data[4:]
662+
if len(data) < length {
663+
return "", nil, fmt.Errorf("buffer too short for string")
664+
}
665+
return string(data[:length]), data[length:], nil
666+
}

rmap/scripts.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ var (
1313
1414
-- Set the updated value in the hash and publish the change
1515
redis.call("HSET", KEYS[1], ARGV[1], v)
16-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. v)
16+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(v), v)
17+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
1718
1819
return v
1920
`)
@@ -53,7 +54,8 @@ var (
5354
-- If changes were made, update the hash and publish the event
5455
if changed then
5556
redis.call("HSET", KEYS[1], ARGV[1], v)
56-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. v)
57+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(v), v)
58+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
5759
end
5860
5961
return v
@@ -64,15 +66,17 @@ var (
6466
luaDelete = redis.NewScript(`
6567
local v = redis.call("HGET", KEYS[1], ARGV[1])
6668
redis.call("HDEL", KEYS[1], ARGV[1])
67-
redis.call("PUBLISH", KEYS[2], "del:" .. ARGV[1])
69+
local msg = struct.pack("ic0", string.len(ARGV[1]), ARGV[1])
70+
redis.call("PUBLISH", KEYS[2], "del:" .. msg)
6871
return v
6972
`)
7073

7174
// luaIncr is the Lua script used to increment a key and return the new value.
7275
luaIncr = redis.NewScript(`
7376
redis.call("HINCRBY", KEYS[1], ARGV[1], ARGV[2])
7477
local v = redis.call("HGET", KEYS[1], ARGV[1])
75-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. v)
78+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(v), v)
79+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
7680
return v
7781
`)
7882

@@ -106,12 +110,14 @@ var (
106110
-- Update the hash or delete the key if empty
107111
if #newValues == 0 then
108112
redis.call("HDEL", KEYS[1], ARGV[1])
109-
redis.call("PUBLISH", KEYS[2], "del:" .. ARGV[1])
113+
local msg = struct.pack("ic0", string.len(ARGV[1]), ARGV[1])
114+
redis.call("PUBLISH", KEYS[2], "del:" .. msg)
110115
v = ""
111116
else
112117
v = table.concat(newValues, ",")
113118
redis.call("HSET", KEYS[1], ARGV[1], v)
114-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. v)
119+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(v), v)
120+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
115121
end
116122
end
117123
@@ -130,7 +136,8 @@ var (
130136
luaSet = redis.NewScript(`
131137
local v = redis.call("HGET", KEYS[1], ARGV[1])
132138
redis.call("HSET", KEYS[1], ARGV[1], ARGV[2])
133-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. ARGV[2])
139+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2])
140+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
134141
return v
135142
`)
136143

@@ -139,7 +146,8 @@ var (
139146
local v = redis.call("HGET", KEYS[1], ARGV[1])
140147
if v == ARGV[2] then
141148
redis.call("HDEL", KEYS[1], ARGV[1])
142-
redis.call("PUBLISH", KEYS[2], "del:" .. ARGV[1])
149+
local msg = struct.pack("ic0", string.len(ARGV[1]), ARGV[1])
150+
redis.call("PUBLISH", KEYS[2], "del:" .. msg)
143151
end
144152
return v
145153
`)
@@ -166,7 +174,8 @@ var (
166174
local v = redis.call("HGET", KEYS[1], ARGV[1])
167175
if v == ARGV[2] then
168176
redis.call("HSET", KEYS[1], ARGV[1], ARGV[3])
169-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. ARGV[3])
177+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(ARGV[3]), ARGV[3])
178+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
170179
end
171180
return v
172181
`)
@@ -176,7 +185,8 @@ var (
176185
local v = redis.call("HGET", KEYS[1], ARGV[1])
177186
if not v then
178187
redis.call("HSET", KEYS[1], ARGV[1], ARGV[2])
179-
redis.call("PUBLISH", KEYS[2], "set:" .. ARGV[1] .. ":" .. ARGV[2])
188+
local msg = struct.pack("ic0ic0", string.len(ARGV[1]), ARGV[1], string.len(ARGV[2]), ARGV[2])
189+
redis.call("PUBLISH", KEYS[2], "set:" .. msg)
180190
return 1 -- Successfully set the value
181191
end
182192
return 0 -- Value already existed

0 commit comments

Comments
 (0)