Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x/redis: add redis streams implementation #52

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
---
version: '2'
services:
redis:
image: docker.io/library/redis
ports:
- "6379:6379"

mosquitto:
image: eclipse-mosquitto:latest
container_name: mosquitto
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/aws/aws-sdk-go v1.44.313
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.5.3
github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27
github.com/segmentio/ksuid v1.0.4
github.com/stretchr/testify v1.8.4
Expand All @@ -15,7 +16,9 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
github.com/aws/aws-sdk-go v1.44.313 h1:u6EuNQqgAmi09GEZ5g/XGHLF0XV31WcdU5rnHyIBHBc=
github.com/aws/aws-sdk-go v1.44.313/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand All @@ -22,6 +30,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27 h1:utfCzMxw9cJZqrpbunKCa/0epOGT2eb+UyoCSBQOnh8=
github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27/go.mod h1:Gyj90y+175aa23yMbbml4zvGuIZj32GOe3qx7wMRgoo=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
Expand Down
21 changes: 20 additions & 1 deletion test/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"testing"
"time"

goredis "github.com/redis/go-redis/v9"
"github.com/runreveal/kawa"
"github.com/runreveal/kawa/x/memory"
"github.com/runreveal/kawa/x/mqtt"

"github.com/runreveal/kawa/x/printer"
"github.com/runreveal/kawa/x/redis"
"github.com/runreveal/kawa/x/scanner"
"github.com/segmentio/ksuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -40,6 +41,24 @@ func BenchmarkMem(b *testing.B) {
}
}

func TestRedis(t *testing.T) {
rc := goredis.NewClient(&goredis.Options{
Addr: "localhost:6379", // Redis server address
Password: "", // No password set
DB: 0, // Use default DB
})
_, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", "kawa", "$").Result()
if err != nil {
fmt.Println("err initing", err)
}

time.Sleep(1 * time.Second)

src := redis.NewSource(redis.WithAddr("localhost:6379"), redis.WithTopic("kawa/topic"))
dst := redis.NewDestination(redis.WithAddr("localhost:6379"), redis.WithTopic("kawa/topic"))
SuiteTest(t, src, dst)
}

func TestIO(t *testing.T) {
reader, writer := io.Pipe()
// Delim should be a string of bytes which is unlikely occur randomly
Expand Down
7 changes: 3 additions & 4 deletions test/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ func SuiteTest(t *testing.T, src kawa.Source[[]byte], dst kawa.Destination[[]byt
if !errors.Is(err, context.Canceled) {
assert.NoError(t, err)
}
if ack != nil {
ack()
}
kawa.Ack(ack)
// fmt.Println("received:")
// stdoutDumper.Write([]byte(msg.Value))
// fmt.Printf("\n")
mark(t, msg.Value, want, seen)
count++
if count == len(want) {
time.Sleep(100 * time.Millisecond)
break
}
}
Expand Down Expand Up @@ -83,7 +82,7 @@ func SuiteTest(t *testing.T, src kawa.Source[[]byte], dst kawa.Destination[[]byt
assert.NoError(t, err)

for i := range seen {
assert.True(t, seen[i], "we should have seen all messages")
assert.True(t, seen[i], "we should have seen all messages, missing: %d", i)
}
}

Expand Down
163 changes: 163 additions & 0 deletions x/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package redis

import (
"context"
"log/slog"

goredis "github.com/redis/go-redis/v9"
"github.com/runreveal/kawa"
"github.com/segmentio/ksuid"
)

type Options struct {
topic string
opts goredis.Options
}

type Destination struct {
opts Options
client *goredis.Client
}

type Option func(*Options)

func WithAddr(addr string) Option {
return func(d *Options) {
d.opts.Addr = addr
}
}

func WithPassword(password string) Option {
return func(d *Options) {
d.opts.Password = password
}
}

func WithUsername(username string) Option {
return func(d *Options) {
d.opts.Username = username
}
}

func WithTopic(topic string) Option {
return func(d *Options) {
d.topic = topic
}
}

func NewDestination(opts ...Option) *Destination {
d := &Destination{}
for _, opt := range opts {
opt(&d.opts)
}
d.client = goredis.NewClient(&d.opts.opts)
return d
}

type binMarshaler []byte

func (b binMarshaler) MarshalBinary() ([]byte, error) {
return b, nil
}

func (d *Destination) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error {
// Send to redis server using XADD
for _, msg := range msgs {
_, err := d.client.XAdd(ctx, &goredis.XAddArgs{
Stream: d.opts.topic,
Values: map[string]any{"msg": binMarshaler(msg.Value)},
}).Result()
if err != nil {
return err
}
}
kawa.Ack(ack)
return nil
}

type Source struct {
opts Options
client *goredis.Client
msgC chan kawa.MsgAck[[]byte]
}

func NewSource(opts ...Option) *Source {
d := &Source{
msgC: make(chan kawa.MsgAck[[]byte]),
}
for _, opt := range opts {
opt(&d.opts)
}
d.client = goredis.NewClient(&d.opts.opts)
return d
}

func (s *Source) Run(ctx context.Context) error {
return s.recvLoop(ctx)
}

func (s *Source) recvLoop(ctx context.Context) error {
var err error

gid := ksuid.New().String()

outer:
for {
msgs, err := s.client.XReadGroup(ctx, &goredis.XReadGroupArgs{
Group: "kawa",
Consumer: gid,
Streams: []string{s.opts.topic, ">"},
Count: 100,
Block: 0,
}).Result()
if err != nil {
return err
}

// read some messages from the topic
for _, msg := range msgs {
topic := msg.Stream
for _, m := range msg.Messages {
var bts []byte
if v, ok := m.Values["msg"].(string); ok {
bts = []byte(v)
}

mid := m.ID
msgAck := kawa.MsgAck[[]byte]{
Msg: kawa.Message[[]byte]{
Key: m.ID,
Topic: topic,
Value: bts,
},
Ack: func() {
_, err := s.client.XAck(ctx, topic, "kawa", mid).Result()
if err != nil {
slog.Warn("failed to ack redis message", "err", err)
}
},
}

select {
case <-ctx.Done():
err = ctx.Err()
break outer
case s.msgC <- msgAck:
}

}

}
}

return err
}

func (s *Source) Recv(ctx context.Context) (kawa.Message[[]byte], func(), error) {
select {
case <-ctx.Done():
return kawa.Message[[]byte]{}, nil, ctx.Err()
case msgAck := <-s.msgC:
return msgAck.Msg, msgAck.Ack, nil
}
}
53 changes: 53 additions & 0 deletions x/redis/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package redis

import (
"context"
"fmt"
"log"
"testing"
"time"

"github.com/redis/go-redis/v9"
)

func TestRedis(t *testing.T) {
// Initialize the Redis client
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis server address
Password: "", // No password set
DB: 0, // Use default DB
})

x := []byte(`{"name":"John Doe","age":30}`)

ctx := context.Background()

// Push a message onto the queue
err := rdb.LPush(ctx, "myqueue", x).Err()
if err != nil {
log.Fatalf("Could not push message to queue: %v", err)
}

// Simulate a delay
time.Sleep(200 * time.Millisecond)

// Pop a message from the queue
msg, err := rdb.BRPopLPush(ctx, "myqueue", "processing", 0).Bytes()
defer func() {
_, err := rdb.LRem(ctx, "processing", 1, msg).Result()
if err != nil {
log.Fatalf("Could not remove message from processing Queue: %v", err)
} else {
fmt.Printf("Removed message from processing Queue: %s\n", string(msg))
}
}()

if err == redis.Nil {
fmt.Println("Queue is empty")
} else if err != nil {
log.Fatalf("Could not pop message from queue: %v", err)
} else {
fmt.Printf("Popped message: %s\n", string(msg))
}
// Test code here
}
Loading