Skip to content

Commit

Permalink
rsql: Add manyInserter and InsertMany API
Browse files Browse the repository at this point in the history
  • Loading branch information
NeilLuno committed Nov 12, 2024
1 parent 361bf8e commit 7f690d3
Show file tree
Hide file tree
Showing 12 changed files with 510 additions and 5 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
module github.com/luno/reflex

go 1.21
go 1.22

require (
github.com/aws/aws-sdk-go-v2/service/s3 v1.35.0
github.com/go-sql-driver/mysql v1.7.1
github.com/luno/jettison v0.0.0-20230912135954-09d6084f5df9
github.com/prometheus/client_golang v1.15.0
github.com/prometheus/client_model v0.3.0
github.com/sebdah/goldie/v2 v2.5.3
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/sdk v1.14.0
Expand Down Expand Up @@ -52,6 +53,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/sergi/go-diff v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.11.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,7 @@ github.com/sebdah/goldie/v2 v2.5.3 h1:9ES/mNN+HNUbNWpVAlrzuZ7jE+Nrczbj8uFRjM7624
github.com/sebdah/goldie/v2 v2.5.3/go.mod h1:oZ9fp0+se1eapSRjfYbsV/0Hqhbuu3bJVvKI/NNtssI=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shoenig/test v0.6.3/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
Expand Down
83 changes: 81 additions & 2 deletions rsql/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -75,6 +76,68 @@ func makeDefaultInserter(schema eTableSchema) inserter {
}
}

// makeDefaultManyInserter returns the default sql manyInserter configured via WithEventsXField options.
func makeDefaultManyInserter(schema eTableSchema) manyInserter {
return func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error {
if len(events) == 0 {
return nil
}
q, args, err := makeInsertManyQuery(ctx, schema, events)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, q, args...)
return errors.Wrap(err, "insert error")
}
}

func makeInsertManyQuery(
ctx context.Context,
schema eTableSchema,
events []EventToInsert,
) (query string, args []any, err error) {
spanCtx, hasTrace := tracing.Extract(ctx)
var traceData []byte
if schema.traceField != "" && hasTrace {
d, err := tracing.Marshal(spanCtx)
if err != nil {
return "", nil, err
}
traceData = d
}

cols := []string{schema.foreignIDField, schema.typeField, schema.timeField}
if schema.metadataField != "" {
cols = append(cols, schema.metadataField)
}
if traceData != nil {
cols = append(cols, schema.traceField)
}

q := "insert into " + schema.name + " (" + strings.Join(cols, ", ") + ") values"

for i, e := range events {
vals := []string{"?", "?", "now(6)"}
args = append(args, e.ForeignID, e.Type.ReflexType())
if schema.metadataField != "" {
vals = append(vals, "?")
args = append(args, e.Metadata)
} else if e.Metadata != nil {
return "", nil, errors.New("metadata not enabled")
}
if traceData != nil {
vals = append(vals, "?")
args = append(args, traceData)
}
if i > 0 {
q += ","
}
q += " (" + strings.Join(vals, ", ") + ")"
}

return q, args, nil
}

type row interface {
Scan(dest ...interface{}) error
}
Expand Down Expand Up @@ -158,7 +221,14 @@ func getNextEvents(ctx context.Context, dbc *sql.DB, schema eTableSchema,
}

// GetNextEventsForTesting fetches a bunch of events from the event table
func GetNextEventsForTesting(ctx context.Context, _ *testing.T, dbc *sql.DB, table *EventsTable, after int64, lag time.Duration) ([]*reflex.Event, error) {
func GetNextEventsForTesting(
ctx context.Context,
_ *testing.T,
dbc *sql.DB,
table *EventsTable,
after int64,
lag time.Duration,
) ([]*reflex.Event, error) {
return getNextEvents(ctx, dbc, table.schema, after, lag)
}

Expand Down Expand Up @@ -287,7 +357,16 @@ func makeDefaultErrorInserter(schema errTableSchema) ErrorInserter {
// NB: See the documentation is the following link on the behaviour of "on last_insert_id(<expr>)" https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id
q := fmt.Sprintf(
"insert into %s set %s=?, %s=?, %s=?, %s=now(6), %s=now(6), %s=? on duplicate key update %s=last_insert_id(%s)",
schema.name, schema.eventConsumerField, schema.eventIDField, schema.errorMsgField, schema.errorCreatedAtField, schema.errorUpdatedAtField, schema.errorStatusField, schema.idField, schema.idField)
schema.name,
schema.eventConsumerField,
schema.eventIDField,
schema.errorMsgField,
schema.errorCreatedAtField,
schema.errorUpdatedAtField,
schema.errorStatusField,
schema.idField,
schema.idField,
)
return func(ctx context.Context, tx *sql.Tx, consumer string, eventID string, errMsg string, errStatus reflex.ErrorStatus) (string, error) {
r, err := tx.ExecContext(ctx, q, consumer, eventID, errMsg, errStatus)
// If the error has already been written then we can ignore the error
Expand Down
106 changes: 106 additions & 0 deletions rsql/db_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package rsql

import (
"context"
"testing"

"github.com/luno/jettison/jtest"
"github.com/sebdah/goldie/v2"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/trace"

"github.com/luno/reflex/internal/tracing"
)

//go:generate go test . -run Test_makeInsertManyQuery -update -clean

func Test_makeInsertManyQuery(t *testing.T) {
ctx := context.Background()

type res struct {
Q string
Args []any
}

defaultSchema := eTableSchema{
name: "events",
idField: "id",
timeField: "timestamp",
typeField: "type",
foreignIDField: "foreign_id",
}

t.Run("empty", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, nil)
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("one", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("two", func(t *testing.T) {
q, args, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), nil},
{"fid2", testEventType(2), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("more", func(t *testing.T) {
var events []EventToInsert
for i := range 100 {
events = append(events, EventToInsert{"fid", testEventType(i), nil})
}
q, args, err := makeInsertManyQuery(ctx, defaultSchema, events)
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("metadata_error", func(t *testing.T) {
_, _, err := makeInsertManyQuery(ctx, defaultSchema, []EventToInsert{
{"fid1", testEventType(1), []byte("metadata")},
})
require.ErrorContains(t, err, "metadata not enable")
})

t.Run("with_metadata", func(t *testing.T) {
schemaWithMetadata := defaultSchema
schemaWithMetadata.metadataField = "metadata"
q, args, err := makeInsertManyQuery(ctx, schemaWithMetadata, []EventToInsert{
{"fid1", testEventType(1), []byte("metadata")},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})

t.Run("with_trace", func(t *testing.T) {
schemaWithTrace := defaultSchema
schemaWithTrace.traceField = "trace"
traceID, err := trace.TraceIDFromHex("00000000000000000000000000000009")
jtest.RequireNil(t, err)
spanID, err := trace.SpanIDFromHex("0000000000000002")
jtest.RequireNil(t, err)
data, err := tracing.Marshal(trace.NewSpanContext(trace.SpanContextConfig{
TraceID: traceID,
SpanID: spanID,
}))
jtest.RequireNil(t, err)
ctx := tracing.Inject(ctx, data)
q, args, err := makeInsertManyQuery(ctx, schemaWithTrace, []EventToInsert{
{"fid1", testEventType(1), nil},
})
jtest.RequireNil(t, err)
goldie.New(t).AssertJson(t, t.Name(), res{q, args})
})
}

type testEventType int

func (t testEventType) ReflexType() int { return int(t) }
58 changes: 56 additions & 2 deletions rsql/eventstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,20 @@ func NewEventsTable(name string, opts ...EventsOption) *EventsTable {
table.inserter = makeDefaultInserter(table.schema)
}

if table.manyInserter == nil {
table.manyInserter = makeDefaultManyInserter(table.schema)
}

table.gapCh = make(chan Gap)
table.gapListeners = make(chan GapListenFunc)
table.gapListenDone = make(chan struct{})
table.currentLoader = buildLoader(table.baseLoader, table.gapCh, table.disableCache, table.schema, table.includeNoopEvents)
table.currentLoader = buildLoader(
table.baseLoader,
table.gapCh,
table.disableCache,
table.schema,
table.includeNoopEvents,
)

return table
}
Expand Down Expand Up @@ -175,10 +185,28 @@ func WithEventsInserter(inserter inserter) EventsOption {
}
}

// WithEventsManyInserter provides an option to set the event inserter
// which inserts many events into a sql table. The default inserter is
// configured with the WithEventsXField options.
func WithEventsManyInserter(manyInserter manyInserter) EventsOption {
return func(table *EventsTable) {
table.manyInserter = manyInserter
}
}

// inserter abstracts the insertion of an event into a sql table.
type inserter func(ctx context.Context, tx *sql.Tx,
foreignID string, typ reflex.EventType, metadata []byte) error

type EventToInsert struct {
ForeignID string
Type reflex.EventType
Metadata []byte
}

// manyInserter abstracts the insertion of many events into a sql table.
type manyInserter func(ctx context.Context, tx *sql.Tx, events ...EventToInsert) error

// EventsTable provides reflex event insertion and streaming
// for a sql db table.
type EventsTable struct {
Expand All @@ -189,6 +217,7 @@ type EventsTable struct {
includeNoopEvents bool
baseLoader loader
inserter inserter
manyInserter manyInserter

// Stateful fields not cloned
currentLoader filterLoader
Expand Down Expand Up @@ -230,6 +259,25 @@ func (t *EventsTable) InsertWithMetadata(ctx context.Context, tx *sql.Tx, foreig
return t.notifier.Notify, nil
}

// InsertMany inserts a many events into the EventsTable.
func (t *EventsTable) InsertMany(
ctx context.Context,
tx *sql.Tx,
events []EventToInsert,
) (NotifyFunc, error) {
for _, e := range events {
if isNoop(e.ForeignID, e.Type) {
return nil, errors.New("inserting invalid noop event")
}
}
err := t.manyInserter(ctx, tx, events...)
if err != nil {
return noopFunc, err
}

return t.notifier.Notify, nil
}

// Clone returns a new events table generated from the config of t with the new options applied.
// Note that non-config fields are not copied, so things like the cache and inmemnotifier
// are not shared.
Expand Down Expand Up @@ -312,7 +360,13 @@ func (t *EventsTable) getSchema() eTableSchema {
}

// buildLoader returns a new layered event loader.
func buildLoader(baseLoader loader, ch chan<- Gap, disableCache bool, schema eTableSchema, withNoopEvents bool) filterLoader {
func buildLoader(
baseLoader loader,
ch chan<- Gap,
disableCache bool,
schema eTableSchema,
withNoopEvents bool,
) filterLoader {
if baseLoader == nil {
baseLoader = makeBaseLoader(schema)
}
Expand Down
22 changes: 22 additions & 0 deletions rsql/eventstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,25 @@ func TestCloneInserter(t *testing.T) {
require.Equal(t, 2, i1)
require.Equal(t, 1, i2)
}

func TestInsertMany(t *testing.T) {
ctx := context.Background()
dbc := ConnectTestDB(t, DefaultEventTable(), DefaultCursorTable(), DefaultErrorTable(), DefaultErrorEventTable())

table1 := rsql.NewEventsTable(eventsTable)

tx, _ := dbc.Begin()
_, err := table1.InsertMany(ctx, tx, []rsql.EventToInsert{
{"fid1", testEventType(1), nil},
{"fid2", testEventType(2), nil},
{"fid3", testEventType(3), nil},
})
jtest.RequireNil(t, err)
err = tx.Commit()
jtest.RequireNil(t, err)
el, err := rsql.GetNextEventsForTesting(context.Background(), t, dbc, table1, 0, 0)
jtest.RequireNil(t, err)
assert.Equal(t, "fid1", el[0].ForeignID)
assert.Equal(t, "fid2", el[1].ForeignID)
assert.Equal(t, "fid3", el[2].ForeignID)
}
4 changes: 4 additions & 0 deletions rsql/testdata/Test_makeInsertManyQuery/empty.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"Q": "insert into events (foreign_id, type, timestamp) values",
"Args": null
}
Loading

0 comments on commit 7f690d3

Please sign in to comment.