Skip to content

Commit

Permalink
rsql: Stop event gap filler (#20)
Browse files Browse the repository at this point in the history
* rsql: Stop event gap filler

* expose function to call stop gaps
  • Loading branch information
adamhicks authored Oct 30, 2024
1 parent 9452d3b commit 8c06636
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 19 deletions.
53 changes: 35 additions & 18 deletions rsql/eventstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable {
}

table.gapCh = make(chan Gap)
table.gapListeners = make(chan GapListenFunc)
table.gapListenDone = make(chan struct{})
table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents)

return table
Expand Down Expand Up @@ -191,8 +193,9 @@ type EventsTable struct {
// Stateful fields not cloned
currentLoader filterLoader
gapCh chan Gap
gapFns []func(Gap)
gapMu sync.Mutex
gapListeners chan GapListenFunc
gapListenDone chan struct{}
gapListening sync.Once
}

// Insert inserts an event into the EventsTable and returns a function that
Expand Down Expand Up @@ -269,24 +272,38 @@ func (t *EventsTable) ToStream(dbc *sql.DB, opts1 ...reflex.StreamOption) reflex
}

// ListenGaps adds f to a slice of functions that are called when a gap is detected.
// One first call, it starts a goroutine that serves these functions.
func (t *EventsTable) ListenGaps(f func(Gap)) {
t.gapMu.Lock()
defer t.gapMu.Unlock()
if len(t.gapFns) == 0 {
// Start serving gaps.
eventsGapListenGauge.WithLabelValues(t.schema.name).Set(1)
go func() {
for gap := range t.gapCh {
t.gapMu.Lock()
for _, f := range t.gapFns {
f(gap)
}
t.gapMu.Unlock()
// On first call, it starts a goroutine that serves these functions.
func (t *EventsTable) ListenGaps(f GapListenFunc) {
t.gapListening.Do(func() { go t.serveGaps() })
t.gapListeners <- f
}

func (t *EventsTable) StopGapListener(ctx context.Context) {
close(t.gapListeners)
select {
case <-ctx.Done():
case <-t.gapListenDone:
}
}

type GapListenFunc func(Gap)

func (t *EventsTable) serveGaps() {
defer close(t.gapListenDone)
var fns []GapListenFunc
for {
select {
case gapFn, more := <-t.gapListeners:
if !more {
return
}
}()
fns = append(fns, gapFn)
case gap := <-t.gapCh:
for _, fn := range fns {
fn(gap)
}
}
}
t.gapFns = append(t.gapFns, f)
}

// getSchema returns the table schema and implements the gapTable interface for FillGaps.
Expand Down
178 changes: 178 additions & 0 deletions rsql/eventstable_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package rsql

import (
"context"
"database/sql"
"slices"
"strconv"
"sync"
"testing"
"time"

"github.com/luno/jettison/jtest"

"github.com/luno/reflex"
)

type EventTable struct {
mu sync.Mutex
Events []*reflex.Event
}

func InitTable(inEvents []*reflex.Event) *EventTable {
start := inEvents[0].IDInt()
end := inEvents[len(inEvents)-1].IDInt()
ev := make([]*reflex.Event, end-start+1)
var inIdx int
for i := range ev {
if inEvents[inIdx].IDInt() != start+int64(i) {
continue
}
ev[i] = inEvents[inIdx]
inIdx++
}
return &EventTable{Events: ev}
}

func (t *EventTable) FillGap(g Gap) {
t.mu.Lock()
defer t.mu.Unlock()
from := slices.IndexFunc(t.Events, func(event *reflex.Event) bool {
return event != nil && event.ID == strconv.FormatInt(g.Prev, 10)
})
to := slices.IndexFunc(t.Events, func(event *reflex.Event) bool {
return event != nil && event.ID == strconv.FormatInt(g.Next, 10)
})
if from == -1 || to == -1 {
panic("cant identify gap to fill")
}
id := g.Prev + 1
for i := from + 1; i < to; i++ {
t.Events[i] = &reflex.Event{
ID: strconv.FormatInt(id, 10),
Type: eventType(1),
ForeignID: "123",
Timestamp: time.Unix(int64(i), 0),
}
id++
}
}

func (t *EventTable) GetEvents(after int64) []*reflex.Event {
t.mu.Lock()
defer t.mu.Unlock()
cpy := make([]*reflex.Event, len(t.Events))
copy(cpy, t.Events)
return slices.DeleteFunc(cpy, func(event *reflex.Event) bool {
return event == nil || event.IDInt() <= after
})
}

func (t *EventTable) Len() int {
t.mu.Lock()
defer t.mu.Unlock()
return len(t.Events)
}

func TestGapFiller(t *testing.T) {
evTable := InitTable([]*reflex.Event{
{
ID: "100", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(1, 0),
},
{
ID: "101", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(2, 0),
},
{
ID: "103", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(3, 0),
},
{
ID: "104", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(5, 0),
},
})

loader := func(ctx context.Context, dbc *sql.DB, prevCursor int64, lag time.Duration) ([]*reflex.Event, error) {
return evTable.GetEvents(prevCursor), nil
}

table := NewEventsTable("test", WithEventsLoader(loader), WithEventsBackoff(time.Millisecond))
table.ListenGaps(func(gap Gap) {
evTable.FillGap(gap)
})

toStream := table.ToStream(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
cli, err := toStream(ctx, "")
jtest.RequireNil(t, err)

for i := 0; i < evTable.Len(); i++ {
_, err := cli.Recv()
jtest.RequireNil(t, err, "event", i)
}

_, err = cli.Recv()
jtest.Assert(t, context.DeadlineExceeded, err)

table.StopGapListener(context.Background())
}

func TestGapFillerStopped(t *testing.T) {
evTable := InitTable([]*reflex.Event{
{
ID: "100", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(1, 0),
},
{
ID: "101", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(2, 0),
},
{
ID: "103", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(3, 0),
},
{
ID: "104", Type: eventType(1),
ForeignID: "123", Timestamp: time.Unix(5, 0),
},
})

loader := func(ctx context.Context, dbc *sql.DB, prevCursor int64, lag time.Duration) ([]*reflex.Event, error) {
return evTable.GetEvents(prevCursor), nil
}

table := NewEventsTable("test", WithEventsLoader(loader), WithEventsBackoff(time.Millisecond))
table.ListenGaps(func(gap Gap) {
evTable.FillGap(gap)
})

toStream := table.ToStream(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)
cli, err := toStream(ctx, "")
jtest.RequireNil(t, err)

table.StopGapListener(context.Background())

_, err = cli.Recv()
jtest.RequireNil(t, err)
_, err = cli.Recv()
jtest.RequireNil(t, err)

// Gap never filled, time out
_, err = cli.Recv()
jtest.Assert(t, context.DeadlineExceeded, err)
}

func TestStopGapFillerCanBeCancelled(t *testing.T) {
table := NewEventsTable("test")

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
t.Cleanup(cancel)

// This will deadlock without some other await
table.StopGapListener(ctx)
}
9 changes: 8 additions & 1 deletion rsql/gapfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,17 @@ func FillGaps(dbc *sql.DB, gapTable gapTable) {
gapTable.ListenGaps(makeFill(dbc, gapTable.getSchema()))
}

// StopFillingGaps stops any goroutine started from FillGaps
// If FillGaps has not been called, then this will block until ctx is cancelled or deadline is reached
func StopFillingGaps(ctx context.Context, gapTable gapTable) {
gapTable.StopGapListener(ctx)
}

// gapTable is a common interface between EventsTable and EventsTableInt
// defining the subset of methods required for gap filling.
type gapTable interface {
ListenGaps(f func(Gap))
ListenGaps(listenFunc GapListenFunc)
StopGapListener(ctx context.Context)
getSchema() eTableSchema
}

Expand Down

0 comments on commit 8c06636

Please sign in to comment.