Skip to content

Commit bd14efa

Browse files
committedJul 9, 2024
fix redis test
1 parent 9deee65 commit bd14efa

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed
 

‎test/stream_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ func TestRedis(t *testing.T) {
3232
Password: "", // No password set
3333
DB: 0, // Use default DB
3434
})
35-
gid := ksuid.New().String()
36-
_, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", gid, "$").Result()
35+
_, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", "kawa", "$").Result()
3736
if err != nil {
3837
fmt.Println("err initing", err)
3938
}

‎x/redis/redis.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
goredis "github.com/redis/go-redis/v9"
88
"github.com/runreveal/kawa"
9+
"github.com/segmentio/ksuid"
910
)
1011

1112
type Options struct {
@@ -98,11 +99,13 @@ func (s *Source) Run(ctx context.Context) error {
9899
func (s *Source) recvLoop(ctx context.Context) error {
99100
var err error
100101

102+
gid := ksuid.New().String()
103+
101104
outer:
102105
for {
103106
msgs, err := s.client.XReadGroup(ctx, &goredis.XReadGroupArgs{
104107
Group: "kawa",
105-
Consumer: "1",
108+
Consumer: gid,
106109
Streams: []string{s.opts.topic, ">"},
107110
Count: 100,
108111
Block: 0,

0 commit comments

Comments
 (0)