diff --git a/docker-compose.yml b/docker-compose.yml index 75a6559..ad249f0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,9 @@ ---- -version: '2' services: + redis: + image: docker.io/library/redis + ports: + - "6379:6379" + mosquitto: image: eclipse-mosquitto:latest container_name: mosquitto diff --git a/go.mod b/go.mod index 8e64648..31a0c58 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/aws/aws-sdk-go v1.44.313 github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/pkg/errors v0.9.1 + github.com/redis/go-redis/v9 v9.5.3 github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27 github.com/segmentio/ksuid v1.0.4 github.com/stretchr/testify v1.8.4 @@ -15,7 +16,9 @@ require ( ) require ( + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/gorilla/websocket v1.5.0 // indirect diff --git a/go.sum b/go.sum index a1803cc..de74a40 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,16 @@ github.com/aws/aws-sdk-go v1.44.313 h1:u6EuNQqgAmi09GEZ5g/XGHLF0XV31WcdU5rnHyIBHBc= github.com/aws/aws-sdk-go v1.44.313/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -22,6 +30,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27 h1:utfCzMxw9cJZqrpbunKCa/0epOGT2eb+UyoCSBQOnh8= github.com/runreveal/lib/await v0.0.0-20231125014632-fb732b616d27/go.mod h1:Gyj90y+175aa23yMbbml4zvGuIZj32GOe3qx7wMRgoo= github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c= diff --git a/test/stream_test.go b/test/stream_test.go index f047ff1..aac01e0 100644 --- a/test/stream_test.go +++ b/test/stream_test.go @@ -8,11 +8,12 @@ import ( "testing" "time" + goredis "github.com/redis/go-redis/v9" "github.com/runreveal/kawa" "github.com/runreveal/kawa/x/memory" "github.com/runreveal/kawa/x/mqtt" - "github.com/runreveal/kawa/x/printer" + "github.com/runreveal/kawa/x/redis" "github.com/runreveal/kawa/x/scanner" "github.com/segmentio/ksuid" "github.com/stretchr/testify/assert" @@ -40,6 +41,24 @@ func BenchmarkMem(b *testing.B) { } } +func TestRedis(t *testing.T) { + rc := goredis.NewClient(&goredis.Options{ + Addr: "localhost:6379", // Redis server address + Password: "", // No password set + DB: 0, // Use default DB + }) + _, err := rc.XGroupCreateMkStream(context.Background(), "kawa/topic", "kawa", "$").Result() + if err != nil { + fmt.Println("err initing", err) + } + + time.Sleep(1 * time.Second) + + src := redis.NewSource(redis.WithAddr("localhost:6379"), redis.WithTopic("kawa/topic")) + dst := redis.NewDestination(redis.WithAddr("localhost:6379"), redis.WithTopic("kawa/topic")) + SuiteTest(t, src, dst) +} + func TestIO(t *testing.T) { reader, writer := io.Pipe() // Delim should be a string of bytes which is unlikely occur randomly diff --git a/test/suite_test.go b/test/suite_test.go index 0d6a737..e122419 100644 --- a/test/suite_test.go +++ b/test/suite_test.go @@ -44,15 +44,14 @@ func SuiteTest(t *testing.T, src kawa.Source[[]byte], dst kawa.Destination[[]byt if !errors.Is(err, context.Canceled) { assert.NoError(t, err) } - if ack != nil { - ack() - } + kawa.Ack(ack) // fmt.Println("received:") // stdoutDumper.Write([]byte(msg.Value)) // fmt.Printf("\n") mark(t, msg.Value, want, seen) count++ if count == len(want) { + time.Sleep(100 * time.Millisecond) break } } @@ -83,7 +82,7 @@ func SuiteTest(t *testing.T, src kawa.Source[[]byte], dst kawa.Destination[[]byt assert.NoError(t, err) for i := range seen { - assert.True(t, seen[i], "we should have seen all messages") + assert.True(t, seen[i], "we should have seen all messages, missing: %d", i) } } diff --git a/x/redis/redis.go b/x/redis/redis.go new file mode 100644 index 0000000..40824e3 --- /dev/null +++ b/x/redis/redis.go @@ -0,0 +1,163 @@ +package redis + +import ( + "context" + "log/slog" + + goredis "github.com/redis/go-redis/v9" + "github.com/runreveal/kawa" + "github.com/segmentio/ksuid" +) + +type Options struct { + topic string + opts goredis.Options +} + +type Destination struct { + opts Options + client *goredis.Client +} + +type Option func(*Options) + +func WithAddr(addr string) Option { + return func(d *Options) { + d.opts.Addr = addr + } +} + +func WithPassword(password string) Option { + return func(d *Options) { + d.opts.Password = password + } +} + +func WithUsername(username string) Option { + return func(d *Options) { + d.opts.Username = username + } +} + +func WithTopic(topic string) Option { + return func(d *Options) { + d.topic = topic + } +} + +func NewDestination(opts ...Option) *Destination { + d := &Destination{} + for _, opt := range opts { + opt(&d.opts) + } + d.client = goredis.NewClient(&d.opts.opts) + return d +} + +type binMarshaler []byte + +func (b binMarshaler) MarshalBinary() ([]byte, error) { + return b, nil +} + +func (d *Destination) Send(ctx context.Context, ack func(), msgs ...kawa.Message[[]byte]) error { + // Send to redis server using XADD + for _, msg := range msgs { + _, err := d.client.XAdd(ctx, &goredis.XAddArgs{ + Stream: d.opts.topic, + Values: map[string]any{"msg": binMarshaler(msg.Value)}, + }).Result() + if err != nil { + return err + } + } + kawa.Ack(ack) + return nil +} + +type Source struct { + opts Options + client *goredis.Client + msgC chan kawa.MsgAck[[]byte] +} + +func NewSource(opts ...Option) *Source { + d := &Source{ + msgC: make(chan kawa.MsgAck[[]byte]), + } + for _, opt := range opts { + opt(&d.opts) + } + d.client = goredis.NewClient(&d.opts.opts) + return d +} + +func (s *Source) Run(ctx context.Context) error { + return s.recvLoop(ctx) +} + +func (s *Source) recvLoop(ctx context.Context) error { + var err error + + gid := ksuid.New().String() + +outer: + for { + msgs, err := s.client.XReadGroup(ctx, &goredis.XReadGroupArgs{ + Group: "kawa", + Consumer: gid, + Streams: []string{s.opts.topic, ">"}, + Count: 100, + Block: 0, + }).Result() + if err != nil { + return err + } + + // read some messages from the topic + for _, msg := range msgs { + topic := msg.Stream + for _, m := range msg.Messages { + var bts []byte + if v, ok := m.Values["msg"].(string); ok { + bts = []byte(v) + } + + mid := m.ID + msgAck := kawa.MsgAck[[]byte]{ + Msg: kawa.Message[[]byte]{ + Key: m.ID, + Topic: topic, + Value: bts, + }, + Ack: func() { + _, err := s.client.XAck(ctx, topic, "kawa", mid).Result() + if err != nil { + slog.Warn("failed to ack redis message", "err", err) + } + }, + } + + select { + case <-ctx.Done(): + err = ctx.Err() + break outer + case s.msgC <- msgAck: + } + + } + + } + } + + return err +} + +func (s *Source) Recv(ctx context.Context) (kawa.Message[[]byte], func(), error) { + select { + case <-ctx.Done(): + return kawa.Message[[]byte]{}, nil, ctx.Err() + case msgAck := <-s.msgC: + return msgAck.Msg, msgAck.Ack, nil + } +} diff --git a/x/redis/redis_test.go b/x/redis/redis_test.go new file mode 100644 index 0000000..5af66ec --- /dev/null +++ b/x/redis/redis_test.go @@ -0,0 +1,53 @@ +package redis + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestRedis(t *testing.T) { + // Initialize the Redis client + rdb := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Redis server address + Password: "", // No password set + DB: 0, // Use default DB + }) + + x := []byte(`{"name":"John Doe","age":30}`) + + ctx := context.Background() + + // Push a message onto the queue + err := rdb.LPush(ctx, "myqueue", x).Err() + if err != nil { + log.Fatalf("Could not push message to queue: %v", err) + } + + // Simulate a delay + time.Sleep(200 * time.Millisecond) + + // Pop a message from the queue + msg, err := rdb.BRPopLPush(ctx, "myqueue", "processing", 0).Bytes() + defer func() { + _, err := rdb.LRem(ctx, "processing", 1, msg).Result() + if err != nil { + log.Fatalf("Could not remove message from processing Queue: %v", err) + } else { + fmt.Printf("Removed message from processing Queue: %s\n", string(msg)) + } + }() + + if err == redis.Nil { + fmt.Println("Queue is empty") + } else if err != nil { + log.Fatalf("Could not pop message from queue: %v", err) + } else { + fmt.Printf("Popped message: %s\n", string(msg)) + } + // Test code here +}