Skip to content

Commit

Permalink
reflex/testmock: Add await consumer test util
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewwormald committed Sep 12, 2024
1 parent 6898c0c commit 80d135c
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 0 deletions.
34 changes: 34 additions & 0 deletions testmock/await.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package testmock

import (
"context"
"strconv"
"testing"
"time"

"github.com/luno/jettison/jtest"

"github.com/luno/reflex"
)

func AwaitConsumer(t *testing.T, cs reflex.CursorStore, consumerName string, eventID int64) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
t.Cleanup(cancel)

for ctx.Err() == nil {
val, err := cs.GetCursor(ctx, consumerName)
jtest.RequireNil(t, err)

eID := int64(0)
if val != "" {
eID, err = strconv.ParseInt(val, 10, 64)
jtest.RequireNil(t, err)
}

if eID == eventID {
break
}

time.Sleep(5 * time.Millisecond)
}
}
48 changes: 48 additions & 0 deletions testmock/await_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package testmock_test

import (
"context"
"testing"
"time"

"github.com/luno/jettison/jtest"
"github.com/stretchr/testify/require"

"github.com/luno/reflex"
"github.com/luno/reflex/rpatterns"
"github.com/luno/reflex/testmock"
)

func TestAwait(t *testing.T) {
streamer := testmock.NewTestStreamer(t)
defer streamer.Stop()

streamer.InsertEvent(reflex.Event{
ID: "1",
Type: reflexType(1),
ForeignID: "9",
Timestamp: time.Now(),
})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

cStore := rpatterns.MemCursorStore()

s := ""
result := &s
consumer := reflex.NewConsumer("my-consumer", func(ctx context.Context, event *reflex.Event) error {
*result = "Hello World"
return nil
})

spec := reflex.NewSpec(streamer.StreamFunc(), cStore, consumer, reflex.WithStreamToHead())
go func() {
err := reflex.Run(ctx, spec)
jtest.Require(t, reflex.ErrHeadReached, err)
}()

testmock.AwaitConsumer(t, cStore, "my-consumer", 1)

require.Equal(t, "Hello World", *result)
}
File renamed without changes.
8 changes: 8 additions & 0 deletions testmock/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,16 @@ func NewTestStreamer(t *testing.T) TestStreamer {
}

type TestStreamer interface {
// InsertEvent allows the insertion of the event into the event stream that can be consumed by any callers of
// StreamFunc and may be called before or after a consumer starts consuming the stream.
//
// NOTE: Ypu must provide an ID as it will not be automatically generated.
InsertEvent(r reflex.Event)

// StreamFunc matches the standard signature that consumers expect for consuming a reflex stream.
StreamFunc() reflex.StreamFunc

// Stop should be called once the stream is no longer needed to ensure there is no goroutime leaks.
Stop()
}

Expand Down

0 comments on commit 80d135c

Please sign in to comment.