Skip to content

Commit

Permalink
Retryable event handling
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Feb 4, 2021
1 parent 0be614d commit 8568318
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 76 deletions.
27 changes: 26 additions & 1 deletion eventbus/acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)

// Test async errors from handlers.
errorHandler := mocks.NewEventHandler("error_handler")
errorHandler.Err = errors.New("handler error")
errorHandler.ErrOnce = errors.New("handler error")
bus1.AddHandler(ctx, eh.MatchAll{}, errorHandler)

time.Sleep(timeout) // Need to wait here for handlers to be added.
Expand All @@ -220,6 +220,31 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration)
}
}

// Retryable events.
retryHandler := mocks.NewEventHandler("retry_handler")
retryHandler.ErrOnce = eh.RetryableEventError{Err: errors.New("retryable error")}
bus1.AddHandler(ctx, eh.MatchAll{}, retryHandler)

time.Sleep(timeout) // Need to wait here for handlers to be added.

event4 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event4"}, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 4),
eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}),
)
if err := bus1.HandleEvent(ctx, event4); err != nil {
t.Error("there should be no error:", err)
}
select {
case <-time.After(timeout):
t.Error("there should be a retried event in time")
case <-retryHandler.Recv:
}
retryHandler.Lock()
if retryHandler.NumHandleEvent != 2 {
t.Error("the handler should have been called twice")
}
retryHandler.Unlock()

// Cancel all handlers and wait.
cancel()
bus1.Wait()
Expand Down
9 changes: 7 additions & 2 deletions eventbus/gcp/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,19 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
msg.Nack()
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in GCP event bus: %s", err)
}
msg.Nack()
return
}

msg.Ack()
Expand Down
7 changes: 6 additions & 1 deletion eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,13 +281,18 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader

// Handle the event if it did match.
if err := h.HandleEvent(ctx, event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
return
}

// Log unhandled events, they will NOT be retried.
err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err)
select {
case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}:
default:
log.Printf("eventhorizon: missed error in Kafka event bus: %s", err)
}
return
}

r.CommitMessages(ctx, msg)
Expand Down
17 changes: 15 additions & 2 deletions eventbus/local/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,30 @@ type evt struct {
}

// Handles all events coming in on the channel.
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan evt) {
func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch chan evt) {
defer b.wg.Done()

for {
select {
case e := <-ch:
ctx := eh.UnmarshalContext(ctx, e.ctxVals)
// Ignore non-matching events.
if !m.Match(e.event) {
continue
}

if err := h.HandleEvent(ctx, e.event); err != nil {
// Retryable errors are not logged and will be retried.
if _, ok := err.(eh.RetryableEventError); ok {
// Retry event by putting it back on the bus.
select {
case ch <- e:
default:
// TODO: Maybe log here because queue is full.
}
continue
}

select {
case b.errCh <- eh.EventBusError{Err: fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error()), Ctx: ctx, Event: e.event}:
default:
Expand All @@ -140,7 +153,7 @@ func NewGroup() *Group {
}
}

func (g *Group) channel(id string) <-chan evt {
func (g *Group) channel(id string) chan evt {
g.busMu.Lock()
defer g.busMu.Unlock()

Expand Down
19 changes: 19 additions & 0 deletions eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package eventhorizon

import (
"context"
"fmt"
"reflect"
"runtime"
"strings"
Expand All @@ -40,6 +41,24 @@ type EventHandler interface {
HandleEvent(context.Context, Event) error
}

// RetryableEventError is a "soft" error that handlers should return if they want the
// handler to be retried. This will often be the case when handling events (for
// example in a saga) where related read models have not yet been projected.
// NOTE: The retry behavior is dependent on the eventbus implementation used.
type RetryableEventError struct {
Err error
}

// Error implements the Error method of the error interface.
func (e RetryableEventError) Error() string {
return fmt.Sprintf("retryable: %s", e.Err)
}

// Cause returns the cause of this error.
func (e RetryableEventError) Cause() error {
return e.Err
}

// EventHandlerFunc is a function that can be used as a event handler.
type EventHandlerFunc func(context.Context, Event) error

Expand Down
77 changes: 51 additions & 26 deletions eventhandler/projector/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (e Error) Cause() error {
// ErrModelNotSet is when a model factory is not set on the EventHandler.
var ErrModelNotSet = errors.New("model not set")

// ErrIncorrectEntityVersion is when an entity has an incorrect version.
var ErrIncorrectEntityVersion = errors.New("incorrect entity version")

// ErrIncorrectProjectedEntityVersion is when an entity has an incorrect version after projection.
var ErrIncorrectProjectedEntityVersion = errors.New("incorrect projected entity version")

// NewEventHandler creates a new EventHandler.
func NewEventHandler(projector Projector, repo eh.ReadWriteRepo, options ...Option) *EventHandler {
h := &EventHandler{
Expand Down Expand Up @@ -124,23 +130,38 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
defer cancel()
}
entity, err := h.repo.Find(findCtx, event.AggregateID())
if rrErr, ok := err.(eh.RepoError); ok && rrErr.Err == eh.ErrEntityNotFound {
if h.factoryFn == nil {
if err != nil {
if errors.Is(err, eh.ErrEntityNotFound) {
// Create the model if there was no previous.
// TODO: Consider that the event can still have been projected elsewhere
// but not yet available in this find. Handle this before/when saving!
if h.factoryFn == nil {
return Error{
Err: ErrModelNotSet,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}
entity = h.factoryFn()
} else if errors.Is(err, version.ErrIncorrectLoadedEntityVersion) {
// Retry handling the event if model had the incorrect version.
return eh.RetryableEventError{
Err: Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
},
}
} else {
return Error{
Err: ErrModelNotSet,
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}
entity = h.factoryFn()
} else if err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
}
}

// The entity should be one version behind the event.
Expand All @@ -149,17 +170,19 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
entityVersion = entity.AggregateVersion()

// Ignore old/duplicate events.
if entity.AggregateVersion() >= event.Version() {
if event.Version() <= entity.AggregateVersion() {
return nil
}

if entity.AggregateVersion()+1 != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
if event.Version() != entity.AggregateVersion()+1 {
return eh.RetryableEventError{
Err: Error{
Err: ErrIncorrectEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
},
}
}
}
Expand All @@ -181,7 +204,7 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
entityVersion = newEntity.AggregateVersion()
if newEntity.AggregateVersion() != event.Version() {
return Error{
Err: eh.ErrIncorrectEntityVersion,
Err: ErrIncorrectProjectedEntityVersion,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
Expand All @@ -203,12 +226,14 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
}
} else {
if err := h.repo.Remove(ctx, event.AggregateID()); err != nil {
return Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
return eh.RetryableEventError{
Err: Error{
Err: err,
Projector: h.projector.ProjectorType().String(),
Namespace: eh.NamespaceFromContext(ctx),
EventVersion: event.Version(),
EntityVersion: entityVersion,
},
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions eventhandler/projector/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ func TestEventHandler_SaveError(t *testing.T) {
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
event := eh.NewEvent(mocks.EventType, eventData, timestamp,
eh.ForAggregate(mocks.AggregateType, id, 1))
repo.LoadErr = eh.RepoError{
Err: eh.ErrEntityNotFound,
}
projector.newEntity = &mocks.SimpleModel{
ID: id,
}

saveErr := errors.New("save error")
repo.SaveErr = saveErr
expectedErr := Error{
Expand Down
7 changes: 3 additions & 4 deletions middleware/commandhandler/tracing/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,16 @@ func NewMiddleware() eh.CommandHandlerMiddleware {
return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
opName := fmt.Sprintf("Command(%s)", cmd.CommandType())
sp, ctx := opentracing.StartSpanFromContext(ctx, opName)

err := h.HandleCommand(ctx, cmd)

sp.SetTag("eh.command_type", cmd.CommandType())
sp.SetTag("eh.aggregate_type", cmd.AggregateType())
sp.SetTag("eh.aggregate_id", cmd.AggregateID())

err := h.HandleCommand(ctx, cmd)
if err != nil {
ext.LogError(sp, err)
}
sp.Finish()

sp.Finish()
return err
})
})
Expand Down
2 changes: 1 addition & 1 deletion middleware/eventhandler/async/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestEventHandler(t *testing.T) {
m, errCh = NewMiddleware()
h = eh.UseEventHandlerMiddleware(inner, m)
handlingErr := errors.New("handling error")
inner.Err = handlingErr
inner.ErrOnce = handlingErr
ctx := context.Background()
if err := h.HandleEvent(ctx, event); err != nil {
t.Error("there should never be an error:", err)
Expand Down
13 changes: 8 additions & 5 deletions middleware/eventhandler/tracing/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ type eventHandler struct {
func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
opName := fmt.Sprintf("%s.Event(%s)", h.HandlerType(), event.EventType())
sp, ctx := opentracing.StartSpanFromContext(ctx, opName)

err := h.EventHandler.HandleEvent(ctx, event)

sp.SetTag("eh.event_type", event.EventType())
sp.SetTag("eh.aggregate_type", event.AggregateType())
sp.SetTag("eh.aggregate_id", event.AggregateID())
sp.SetTag("eh.version", event.Version())

err := h.EventHandler.HandleEvent(ctx, event)
if err != nil {
ext.LogError(sp, err)
if _, ok := err.(eh.RetryableEventError); ok {
// Ignore logging retryable errors.
} else {
ext.LogError(sp, err)
}
}
sp.Finish()

sp.Finish()
return err
}
21 changes: 13 additions & 8 deletions mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,14 @@ func (h *CommandHandler) HandleCommand(ctx context.Context, cmd eh.Command) erro
type EventHandler struct {
sync.RWMutex

Type string
Events []eh.Event
Context context.Context
Time time.Time
Recv chan eh.Event
Type string
Events []eh.Event
Context context.Context
Time time.Time
Recv chan eh.Event
NumHandleEvent int
// Used to simulate errors when publishing.
Err error
ErrOnce error
}

var _ = eh.EventHandler(&EventHandler{})
Expand All @@ -228,8 +229,12 @@ func (m *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error {
m.Lock()
defer m.Unlock()

if m.Err != nil {
return m.Err
m.NumHandleEvent++

if m.ErrOnce != nil {
err := m.ErrOnce
m.ErrOnce = nil
return err
}
m.Events = append(m.Events, event)
m.Context = ctx
Expand Down
Loading

0 comments on commit 8568318

Please sign in to comment.