diff --git a/eventstore.go b/eventstore.go index a5fb01e5..d8ba2384 100644 --- a/eventstore.go +++ b/eventstore.go @@ -32,6 +32,7 @@ type EventStore interface { Load(context.Context, uuid.UUID) ([]Event, error) // LoadFrom loads all events from version for the aggregate id from the store. + // Event store should provide events in version order LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]Event, error) // Close closes the EventStore. diff --git a/eventstore/eventsorter/event_sorter.go b/eventstore/eventsorter/event_sorter.go new file mode 100644 index 00000000..26a0caf3 --- /dev/null +++ b/eventstore/eventsorter/event_sorter.go @@ -0,0 +1,61 @@ +package eventsorter + +import ( + "context" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "sort" +) + +// EventSorter is an event store wrapper that warrants events are provided in version order. +// Version order is required for event sourcing to work correctly. +// Use it with an event store that does not warrant version order. +type EventSorter struct { + inner eh.EventStore +} + +var _ eh.EventStore = (*EventSorter)(nil) + +// NewEventSorter creates a new EventSorter wrapping the provided event store +func NewEventSorter(inner eh.EventStore) *EventSorter { + return &EventSorter{inner: inner} +} + +func (e EventSorter) Save(ctx context.Context, events []eh.Event, originalVersion int) error { + return e.inner.Save(ctx, events, originalVersion) +} + +func (e EventSorter) Load(ctx context.Context, uuid uuid.UUID) ([]eh.Event, error) { + events, err := e.inner.Load(ctx, uuid) + + if err != nil { + return nil, err + } + + return e.SortEvents(events), nil +} + +func (e EventSorter) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { + events, err := e.inner.LoadFrom(ctx, id, version) + + if err != nil { + return nil, err + } + + return e.SortEvents(events), nil +} + +func (e EventSorter) Close() error { + return e.inner.Close() +} + +func (e EventSorter) SortEvents(events []eh.Event) []eh.Event { + byVersion := func(i, j int) bool { + return events[i].Version() < events[j].Version() + } + + // It is ok to sort in place, events slice is already the inner store response + sort.Slice(events, byVersion) + + return events +} diff --git a/eventstore/eventsorter/event_sorter_test.go b/eventstore/eventsorter/event_sorter_test.go new file mode 100644 index 00000000..16bb6356 --- /dev/null +++ b/eventstore/eventsorter/event_sorter_test.go @@ -0,0 +1,102 @@ +package eventsorter + +import ( + "context" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +type EventSorterTestSuite struct { + suite.Suite + + innerStore *EventStoreMock + eventSorter *EventSorter + + unsortedEventList []eh.Event +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestEventSorterTestSuite(t *testing.T) { + suite.Run(t, &EventSorterTestSuite{}) +} + +// before each test +func (s *EventSorterTestSuite) SetupTest() { + s.innerStore = &EventStoreMock{} + + s.eventSorter = NewEventSorter(s.innerStore) + + s.unsortedEventList = []eh.Event{ + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 3)), + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 2)), + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), + } +} + +func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_Load() { + // Given a event store with no events + s.innerStore.On("Load", mock.Anything, mock.Anything).Return([]eh.Event{}, nil) + + // When we load the events + events, err := s.eventSorter.Load(context.TODO(), uuid.New()) + + // Then no error is returned + s.NoError(err) + + // And empty event list is returned + s.Len(events, 0) +} + +func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_LoafFrom() { + // Given a event store with no events + s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, mock.Anything).Return([]eh.Event{}, nil) + + // When we load the events + events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 8) + + // Then no error is returned + s.NoError(err) + + // And empty event list is returned + s.Len(events, 0) +} + +func (s *EventSorterTestSuite) Test_can_sort_event_list_on_Load() { + // Given a event store with no events + s.innerStore.On("Load", mock.Anything, mock.Anything).Return(s.unsortedEventList, nil) + + // When we load the events + events, err := s.eventSorter.Load(context.TODO(), uuid.New()) + + // Then no error is returned + s.NoError(err) + + // And the events are returned in version order + s.Len(events, 3) + + s.Equal(1, events[0].Version()) + s.Equal(2, events[1].Version()) + s.Equal(3, events[2].Version()) +} + +func (s *EventSorterTestSuite) Test_can_sort_event_list_on_LoadFrom() { + // Given a event store with no events + s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList[0:2], nil) + + // When we load the events + events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 2) + + // Then no error is returned + s.NoError(err) + + // And the events are returned in version order + s.Len(events, 2) + + s.Equal(2, events[0].Version()) + s.Equal(3, events[1].Version()) +} diff --git a/eventstore/eventsorter/eventstore_stub.go b/eventstore/eventsorter/eventstore_stub.go new file mode 100644 index 00000000..0f4fc811 --- /dev/null +++ b/eventstore/eventsorter/eventstore_stub.go @@ -0,0 +1,40 @@ +package eventsorter + +import ( + "context" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "github.com/stretchr/testify/mock" +) + +type EventStoreMock struct { + mock.Mock +} + +var _ eh.EventStore = (*EventStoreMock)(nil) + +func (e *EventStoreMock) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { + args := e.Called(ctx, id, version) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]eh.Event), args.Error(1) +} + +func (e *EventStoreMock) Save(ctx context.Context, events []eh.Event, originalVersion int) error { + args := e.Called(ctx, events, originalVersion) + return args.Error(0) +} + +func (e *EventStoreMock) Load(ctx context.Context, u uuid.UUID) ([]eh.Event, error) { + args := e.Called(ctx, u) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]eh.Event), args.Error(1) +} + +func (e *EventStoreMock) Close() error { + args := e.Called() + return args.Error(0) +} diff --git a/eventstore/mongodb/eventstore.go b/eventstore/mongodb/eventstore.go index a6efd070..715cf5a2 100644 --- a/eventstore/mongodb/eventstore.go +++ b/eventstore/mongodb/eventstore.go @@ -16,6 +16,7 @@ package mongodb import ( "context" + "errors" "fmt" "time" @@ -332,7 +333,7 @@ func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([ var aggregate aggregateRecord if err := s.aggregates.FindOne(ctx, bson.M{"_id": id}).Decode(&aggregate); err != nil { // Translate to our own not found error. - if err == mongo.ErrNoDocuments { + if errors.Is(err, mongo.ErrNoDocuments) { err = eh.ErrAggregateNotFound } diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 7329ca47..2071fda3 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -40,9 +40,12 @@ import ( "github.com/looplab/eventhorizon/uuid" ) +const Ascending = 1 + // EventStore is an eventhorizon.EventStore for MongoDB, using one collection // for all events and another to keep track of all aggregates/streams. It also // keeps track of the global position of events, stored as metadata. +// This implementation warrants event order by Version on Load and LoadFrom methods (configurable, see WithSortEventsOnDB). type EventStore struct { client *mongo.Client clientOwnership clientOwnership @@ -52,6 +55,7 @@ type EventStore struct { eventHandlerAfterSave eh.EventHandler eventHandlerInTX eh.EventHandler skipNonRegisteredEvents bool + sortEventsOnDb bool // if true, events will be sorted on DB side. Default is false for backward compatibility. } type clientOwnership int @@ -223,6 +227,16 @@ func WithSnapshotCollectionName(snapshotColl string) Option { } } +// WithSortEventsOnDB enables sorting events on DB. +// Without this option, events order should be warranted by DB default ordering. This is not the case for MongoDB. +func WithSortEventsOnDB() Option { + return func(s *EventStore) error { + s.sortEventsOnDb = true + + return nil + } +} + // Save implements the Save method of the eventhorizon.EventStore interface. func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error { if len(events) == 0 { @@ -430,7 +444,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Load implements the Load method of the eventhorizon.EventStore interface. func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, s.makeFindOptions()) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), @@ -444,7 +458,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) // LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface. func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, s.makeFindOptions()) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), @@ -574,6 +588,13 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh return snapshot, nil } +func (s *EventStore) makeFindOptions() *mongoOptions.FindOptions { + if s.sortEventsOnDb { + return options.Find().SetSort(bson.M{"version": Ascending}) + } + return options.Find() +} + func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh.Snapshot) (err error) { if snapshot.AggregateType == "" { return &eh.EventStoreError{