diff --git a/azbus/batchreceiver.go b/azbus/batchreceiver.go new file mode 100644 index 0000000..c61fb9b --- /dev/null +++ b/azbus/batchreceiver.go @@ -0,0 +1,255 @@ +package azbus + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + opentracing "github.com/opentracing/opentracing-go" +) + +// BatchHandler is completely responsible for the processing of a batch of messages. +// Implementations take complete responsibility for the peek lock renewal and disposal of messages. +type BatchHandler interface { + Handle(context.Context, Disposer, []*ReceivedMessage) error + Open() error + Close() +} + +// BatchRecieverConfig provides the configuration for receivers that work with azure batched send +// * There is not autmatic message lock renewal provision for the batched receiver +// * There is no support for deadletter queues on the batched receiver +type BatchReceiverConfig struct { + ConnectionString string + + // Name is the name of the queue or topic + TopicOrQueueName string + + // Subscriptioon is the name of the topic subscription. + // If blank then messages are received from a Queue. + SubscriptionName string + + // If a deadletter receiver then this is true + Deadletter bool + + // Receive messages in a batch and completely delegate processing to a single dedicated handler + BatchSize int + + // A batch operation must abandon any message that takes longer than this to process. + // Defaults to DefaultRenewalTime. + BatchDeadline time.Duration +} + +// BatchReceiver to receive messages on a queue +type BatchReceiver struct { + azClient AZClient + + Cfg BatchReceiverConfig + + log Logger + mtx sync.Mutex + Receiver *azservicebus.Receiver + Options *azservicebus.ReceiverOptions + Handler BatchHandler + Cancel context.CancelFunc +} + +type BatchReceiverOption func(*BatchReceiver) + +// WithBatchDeadline sets the context deadline used for the batch operation. +// If this is not set, the default is DefaultRenewalTime. +func WithBatchDeadline(d time.Duration) BatchReceiverOption { + return func(r *BatchReceiver) { + r.Cfg.BatchDeadline = d + } +} + +// NewBatchReceiver creates a new BatchReceiver. +func NewBatchReceiver(log Logger, handler BatchHandler, cfg BatchReceiverConfig, opts ...BatchReceiverOption) *BatchReceiver { + r := BatchReceiver{} + var options *azservicebus.ReceiverOptions + + r.Cfg = cfg + r.azClient = NewAZClient(cfg.ConnectionString) + r.Options = options + r.Handler = handler + r.log = log.WithIndex("receiver", r.String()) + for _, opt := range opts { + opt(&r) + } + + if r.Cfg.BatchDeadline == 0 { + r.Cfg.BatchDeadline = DefaultRenewalTime + } + + return &r +} + +// String - returns string representation of receiver. +func (r *BatchReceiver) String() string { + // No log function calls in this method please. + if r.Cfg.SubscriptionName != "" { + return fmt.Sprintf("%s.%s", r.Cfg.TopicOrQueueName, r.Cfg.SubscriptionName) + } + return fmt.Sprintf("%s", r.Cfg.TopicOrQueueName) +} + +func (r *BatchReceiver) CreateBatchReceivedMessageTracingContext(ctx context.Context, spanProps map[string]string) (context.Context, opentracing.Span) { + // We don't have the tracing span info on the context yet, that is what this function will add + // we we log using the reciever logger + r.log.Debugf("ContextFromReceivedMessage(): %v", spanProps) + + var opts = []opentracing.StartSpanOption{} + carrier := opentracing.TextMapCarrier{} + // This just gets all the message Application Properties into a string map. That map is then passed into the + // open tracing constructor which extracts any bits it is interested in to use to setup the spans etc. + // It will ignore anything it doesn't care about. So the filtering of the map is done for us and + // we don't need to pre-filter it. + for k, v := range spanProps { + carrier.Set(k, v) + } + spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier) + if err != nil { + r.log.Infof("CreateBatchReceivedMessageTracingContext(): Unable to extract span context: %v", err) + } else { + opts = append(opts, opentracing.ChildOf(spanCtx)) + } + span := opentracing.StartSpan("handle batch", opts...) + ctx = opentracing.ContextWithSpan(ctx, span) + return ctx, span +} + +func (r *BatchReceiver) receiveMessages(ctx context.Context) error { + r.log.Debugf("BatchSize %d, BatchDeadline: %v", r.Cfg.BatchSize, r.Cfg.BatchDeadline) + + for { + err := r.receiveOneMessageBatch(ctx) + if err != nil { + return err + } + } +} + +func (r *BatchReceiver) receiveOneMessageBatch(ctx context.Context) error { + + if r.Cfg.BatchSize == 0 { + return fmt.Errorf("BatchSize must be greater than zero") + } + + var err error + var messages []*ReceivedMessage + messages, err = r.Receiver.ReceiveMessages(ctx, r.Cfg.BatchSize, nil) + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + total := len(messages) + r.log.Debugf("total messages %d", total) + + // set a deadline for the batch operation, this should be shorter than the peak lock timeout + batchCtx, cancel := context.WithTimeout(ctx, r.Cfg.BatchDeadline) + defer cancel() + + // creating the span props from the first message is a bit arbitrary, but it's the best we can do + spanProps := make(map[string]string) + for k, v := range messages[0].ApplicationProperties { + if value, ok := v.(string); ok { + spanProps[k] = value + } + } + + batchCtx, span := r.CreateBatchReceivedMessageTracingContext(batchCtx, spanProps) + defer span.Finish() + + err = r.Handler.Handle(batchCtx, r, messages) + if err != nil { + r.log.Infof("terminating due to batch handler err: %v", err) + return err + } + + r.log.Debugf("Processed %d messages", total) + + return nil +} + +// The following 2 methods satisfy the startup.Listener interface. +func (r *BatchReceiver) Listen() error { + ctx, cancel := context.WithCancel(context.Background()) + r.Cancel = cancel + r.log.Debugf("listen") + err := r.open() + if err != nil { + azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + return r.receiveMessages(ctx) +} + +func (r *BatchReceiver) Shutdown(ctx context.Context) error { + r.Cancel() + r.close_() + return nil +} + +func (r *BatchReceiver) open() error { + var err error + + if r.Receiver != nil { + return nil + } + + client, err := r.azClient.azClient() + if err != nil { + return err + } + + var receiver *azservicebus.Receiver + if r.Cfg.SubscriptionName != "" { + receiver, err = client.NewReceiverForSubscription(r.Cfg.TopicOrQueueName, r.Cfg.SubscriptionName, r.Options) + } else { + receiver, err = client.NewReceiverForQueue(r.Cfg.TopicOrQueueName, r.Options) + } + if err != nil { + azerr := fmt.Errorf("%s: failed to open receiver: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + return azerr + } + + r.Receiver = receiver + if r.Handler != nil { + err = r.Handler.Open() + if err != nil { + return fmt.Errorf("failed to open batch handler: %w", err) + } + } + return nil +} + +func (r *BatchReceiver) close_() { + if r != nil { + r.log.Debugf("Close") + if r.Receiver != nil { + r.mtx.Lock() + defer r.mtx.Unlock() + if r.Handler != nil { + r.log.Debugf("Close batch handler") + r.Handler.Close() + r.Handler = nil + } + + r.log.Debugf("Close receiver") + err := r.Receiver.Close(context.Background()) + if err != nil { + azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err)) + r.log.Infof("%s", azerr) + } + r.Handler = nil + r.Receiver = nil + r.Cancel = nil + } + } +} diff --git a/azbus/disposition.go b/azbus/disposition.go index 7f24e15..cf7615c 100644 --- a/azbus/disposition.go +++ b/azbus/disposition.go @@ -6,6 +6,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/datatrails/go-datatrails-common/logger" "github.com/datatrails/go-datatrails-common/tracing" ) @@ -20,6 +21,10 @@ const ( CompleteDisposition ) +type Disposer interface { + Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) +} + func (d Disposition) String() string { switch { case d == DeadletterDisposition: @@ -51,44 +56,39 @@ func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *R } } -// NB: ALL disposition methods return nil so they can be used in return statements +func (r *BatchReceiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) { + switch { + case d == DeadletterDisposition: + r.deadLetter(ctx, err, msg) + return + case d == AbandonDisposition: + r.abandon(ctx, err, msg) + return + case d == RescheduleDisposition: + r.reschedule(ctx, err, msg) + return + case d == CompleteDisposition: + r.complete(ctx, err, msg) + return + } +} -// Abandon abandons message. This function is not used but is present for consistency. -func (r *Receiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) { +func abandon(ctx context.Context, log logger.Logger, r *azservicebus.Receiver, err error, msg *ReceivedMessage) { ctx = context.WithoutCancel(ctx) - log := r.log.FromContext(ctx) - defer log.Close() span, ctx := tracing.StartSpanFromContext(ctx, "Message.Abandon") defer span.Finish() log.Infof("Abandon Message on DeliveryCount %d: %v", msg.DeliveryCount, err) - err1 := r.receiver.AbandonMessage(ctx, msg, nil) + err1 := r.AbandonMessage(ctx, msg, nil) if err1 != nil { azerr := fmt.Errorf("Abandon Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } } -// Reschedule handles when a message should be deferred at a later time. There are a -// number of ways of doing this but it turns out that simply not doing anything causes -// azservicebus to resubmit the message 1 minute later. We keep the function signature with -// unused arguments for consistency and in case we need to implement more sophisticated -// algorithms in future. -func (r *Receiver) reschedule(ctx context.Context, err error, msg *ReceivedMessage) { - ctx = context.WithoutCancel(ctx) - log := r.log.FromContext(ctx) - defer log.Close() - - span, _ := tracing.StartSpanFromContext(ctx, "Message.Reschedule") - defer span.Finish() - log.Infof("Reschedule Message on DeliveryCount %d: %v", msg.DeliveryCount, err) -} - // DeadLetter explicitly deadletters a message. -func (r *Receiver) deadLetter(ctx context.Context, err error, msg *ReceivedMessage) { +func deadLetter(ctx context.Context, log logger.Logger, r *azservicebus.Receiver, err error, msg *ReceivedMessage) { ctx = context.WithoutCancel(ctx) - log := r.log.FromContext(ctx) - defer log.Close() span, ctx := tracing.StartSpanFromContext(ctx, "Message.DeadLetter") defer span.Finish() @@ -96,17 +96,15 @@ func (r *Receiver) deadLetter(ctx context.Context, err error, msg *ReceivedMessa options := azservicebus.DeadLetterOptions{ Reason: to.Ptr(err.Error()), } - err1 := r.receiver.DeadLetterMessage(ctx, msg, &options) + err1 := r.DeadLetterMessage(ctx, msg, &options) if err1 != nil { azerr := fmt.Errorf("DeadLetter Message failure: %w", NewAzbusError(err1)) log.Infof("%s", azerr) } } -func (r *Receiver) complete(ctx context.Context, err error, msg *ReceivedMessage) { +func complete(ctx context.Context, log logger.Logger, r *azservicebus.Receiver, err error, msg *ReceivedMessage) { ctx = context.WithoutCancel(ctx) - log := r.log.FromContext(ctx) - defer log.Close() span, _ := tracing.StartSpanFromContext(ctx, "Message.Complete") defer span.Finish() @@ -114,10 +112,10 @@ func (r *Receiver) complete(ctx context.Context, err error, msg *ReceivedMessage if err != nil { log.Infof("Complete Message %v", err) } else { - log.Infof("Complete Message") + log.Debugf("Complete Message") } - err1 := r.receiver.CompleteMessage(ctx, msg, nil) + err1 := r.CompleteMessage(ctx, msg, nil) if err1 != nil { // If the completion fails then the message will get rescheduled, but it's effect will // have been made, so we could get duplication issues. @@ -125,3 +123,72 @@ func (r *Receiver) complete(ctx context.Context, err error, msg *ReceivedMessage log.Infof("%s", azerr) } } + +// Reschedule handles when a message should be deferred at a later time. There are a +// number of ways of doing this but it turns out that simply not doing anything causes +// azservicebus to resubmit the message 1 minute later. We keep the function signature with +// unused arguments for consistency and in case we need to implement more sophisticated +// algorithms in future. +func reschedule(ctx context.Context, log logger.Logger, r *azservicebus.Receiver, err error, msg *ReceivedMessage) { + ctx = context.WithoutCancel(ctx) + + span, _ := tracing.StartSpanFromContext(ctx, "Message.Reschedule") + defer span.Finish() + log.Infof("Reschedule Message on DeliveryCount %d: %v", msg.DeliveryCount, err) +} + +// Abandon abandons message. This function is not used but is present for consistency. +func (r *Receiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + abandon(ctx, log, r.receiver, err, msg) +} + +func (r *Receiver) reschedule(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + reschedule(ctx, log, r.receiver, err, msg) +} + +func (r *Receiver) deadLetter(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + deadLetter(ctx, log, r.receiver, err, msg) +} + +func (r *Receiver) complete(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + complete(ctx, log, r.receiver, err, msg) +} + +// Abandon abandons message. This function is not used but is present for consistency. +func (r *BatchReceiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + abandon(ctx, log, r.Receiver, err, msg) +} + +func (r *BatchReceiver) reschedule(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + reschedule(ctx, log, r.Receiver, err, msg) +} + +func (r *BatchReceiver) deadLetter(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + deadLetter(ctx, log, r.Receiver, err, msg) +} + +func (r *BatchReceiver) complete(ctx context.Context, err error, msg *ReceivedMessage) { + log := r.log.FromContext(ctx) + defer log.Close() + + complete(ctx, log, r.Receiver, err, msg) +} diff --git a/azbus/mocks/BatchHandler.go b/azbus/mocks/BatchHandler.go new file mode 100644 index 0000000..28719bd --- /dev/null +++ b/azbus/mocks/BatchHandler.go @@ -0,0 +1,72 @@ +// Code generated by mockery v2.50.0. DO NOT EDIT. + +package mocks + +import ( + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + azbus "github.com/datatrails/go-datatrails-common/azbus" + + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// BatchHandler is an autogenerated mock type for the BatchHandler type +type BatchHandler struct { + mock.Mock +} + +// Close provides a mock function with no fields +func (_m *BatchHandler) Close() { + _m.Called() +} + +// Handle provides a mock function with given fields: _a0, _a1, _a2 +func (_m *BatchHandler) Handle(_a0 context.Context, _a1 azbus.Disposer, _a2 []*azservicebus.ReceivedMessage) error { + ret := _m.Called(_a0, _a1, _a2) + + if len(ret) == 0 { + panic("no return value specified for Handle") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, azbus.Disposer, []*azservicebus.ReceivedMessage) error); ok { + r0 = rf(_a0, _a1, _a2) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Open provides a mock function with no fields +func (_m *BatchHandler) Open() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Open") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewBatchHandler creates a new instance of BatchHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBatchHandler(t interface { + mock.TestingT + Cleanup(func()) +}) *BatchHandler { + mock := &BatchHandler{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/azbus/mocks/Disposer.go b/azbus/mocks/Disposer.go new file mode 100644 index 0000000..7f9979e --- /dev/null +++ b/azbus/mocks/Disposer.go @@ -0,0 +1,36 @@ +// Code generated by mockery v2.50.0. DO NOT EDIT. + +package mocks + +import ( + azservicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + azbus "github.com/datatrails/go-datatrails-common/azbus" + + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// Disposer is an autogenerated mock type for the Disposer type +type Disposer struct { + mock.Mock +} + +// Dispose provides a mock function with given fields: ctx, d, err, msg +func (_m *Disposer) Dispose(ctx context.Context, d azbus.Disposition, err error, msg *azservicebus.ReceivedMessage) { + _m.Called(ctx, d, err, msg) +} + +// NewDisposer creates a new instance of Disposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewDisposer(t interface { + mock.TestingT + Cleanup(func()) +}) *Disposer { + mock := &Disposer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/azbus/mocks/Handler.go b/azbus/mocks/Handler.go index 469a383..7b8b1b1 100644 --- a/azbus/mocks/Handler.go +++ b/azbus/mocks/Handler.go @@ -1,4 +1,4 @@ -// Code generated by mockery. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -16,46 +16,11 @@ type Handler struct { mock.Mock } -type Handler_Expecter struct { - mock *mock.Mock -} - -func (_m *Handler) EXPECT() *Handler_Expecter { - return &Handler_Expecter{mock: &_m.Mock} -} - -// Close provides a mock function with given fields: +// Close provides a mock function with no fields func (_m *Handler) Close() { _m.Called() } -// Handler_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type Handler_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -func (_e *Handler_Expecter) Close() *Handler_Close_Call { - return &Handler_Close_Call{Call: _e.mock.On("Close")} -} - -func (_c *Handler_Close_Call) Run(run func()) *Handler_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Handler_Close_Call) Return() *Handler_Close_Call { - _c.Call.Return() - return _c -} - -func (_c *Handler_Close_Call) RunAndReturn(run func()) *Handler_Close_Call { - _c.Call.Return(run) - return _c -} - // Handle provides a mock function with given fields: _a0, _a1 func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage) (azbus.Disposition, context.Context, error) { ret := _m.Called(_a0, _a1) @@ -93,36 +58,7 @@ func (_m *Handler) Handle(_a0 context.Context, _a1 *azservicebus.ReceivedMessage return r0, r1, r2 } -// Handler_Handle_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Handle' -type Handler_Handle_Call struct { - *mock.Call -} - -// Handle is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *azservicebus.ReceivedMessage -func (_e *Handler_Expecter) Handle(_a0 interface{}, _a1 interface{}) *Handler_Handle_Call { - return &Handler_Handle_Call{Call: _e.mock.On("Handle", _a0, _a1)} -} - -func (_c *Handler_Handle_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.ReceivedMessage)) *Handler_Handle_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*azservicebus.ReceivedMessage)) - }) - return _c -} - -func (_c *Handler_Handle_Call) Return(_a0 azbus.Disposition, _a1 context.Context, _a2 error) *Handler_Handle_Call { - _c.Call.Return(_a0, _a1, _a2) - return _c -} - -func (_c *Handler_Handle_Call) RunAndReturn(run func(context.Context, *azservicebus.ReceivedMessage) (azbus.Disposition, context.Context, error)) *Handler_Handle_Call { - _c.Call.Return(run) - return _c -} - -// Open provides a mock function with given fields: +// Open provides a mock function with no fields func (_m *Handler) Open() error { ret := _m.Called() @@ -140,33 +76,6 @@ func (_m *Handler) Open() error { return r0 } -// Handler_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' -type Handler_Open_Call struct { - *mock.Call -} - -// Open is a helper method to define mock.On call -func (_e *Handler_Expecter) Open() *Handler_Open_Call { - return &Handler_Open_Call{Call: _e.mock.On("Open")} -} - -func (_c *Handler_Open_Call) Run(run func()) *Handler_Open_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *Handler_Open_Call) Return(_a0 error) *Handler_Open_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *Handler_Open_Call) RunAndReturn(run func() error) *Handler_Open_Call { - _c.Call.Return(run) - return _c -} - // NewHandler creates a new instance of Handler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewHandler(t interface { diff --git a/azbus/mocks/MsgReceiver.go b/azbus/mocks/MsgReceiver.go index 00f923b..d8b8962 100644 --- a/azbus/mocks/MsgReceiver.go +++ b/azbus/mocks/MsgReceiver.go @@ -1,4 +1,4 @@ -// Code generated by mockery. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -15,15 +15,7 @@ type MsgReceiver struct { mock.Mock } -type MsgReceiver_Expecter struct { - mock *mock.Mock -} - -func (_m *MsgReceiver) EXPECT() *MsgReceiver_Expecter { - return &MsgReceiver_Expecter{mock: &_m.Mock} -} - -// GetAZClient provides a mock function with given fields: +// GetAZClient provides a mock function with no fields func (_m *MsgReceiver) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -41,34 +33,7 @@ func (_m *MsgReceiver) GetAZClient() azbus.AZClient { return r0 } -// MsgReceiver_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' -type MsgReceiver_GetAZClient_Call struct { - *mock.Call -} - -// GetAZClient is a helper method to define mock.On call -func (_e *MsgReceiver_Expecter) GetAZClient() *MsgReceiver_GetAZClient_Call { - return &MsgReceiver_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} -} - -func (_c *MsgReceiver_GetAZClient_Call) Run(run func()) *MsgReceiver_GetAZClient_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgReceiver_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgReceiver_GetAZClient_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgReceiver_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgReceiver_GetAZClient_Call { - _c.Call.Return(run) - return _c -} - -// Listen provides a mock function with given fields: +// Listen provides a mock function with no fields func (_m *MsgReceiver) Listen() error { ret := _m.Called() @@ -86,33 +51,6 @@ func (_m *MsgReceiver) Listen() error { return r0 } -// MsgReceiver_Listen_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Listen' -type MsgReceiver_Listen_Call struct { - *mock.Call -} - -// Listen is a helper method to define mock.On call -func (_e *MsgReceiver_Expecter) Listen() *MsgReceiver_Listen_Call { - return &MsgReceiver_Listen_Call{Call: _e.mock.On("Listen")} -} - -func (_c *MsgReceiver_Listen_Call) Run(run func()) *MsgReceiver_Listen_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgReceiver_Listen_Call) Return(_a0 error) *MsgReceiver_Listen_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgReceiver_Listen_Call) RunAndReturn(run func() error) *MsgReceiver_Listen_Call { - _c.Call.Return(run) - return _c -} - // Shutdown provides a mock function with given fields: _a0 func (_m *MsgReceiver) Shutdown(_a0 context.Context) error { ret := _m.Called(_a0) @@ -131,35 +69,7 @@ func (_m *MsgReceiver) Shutdown(_a0 context.Context) error { return r0 } -// MsgReceiver_Shutdown_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Shutdown' -type MsgReceiver_Shutdown_Call struct { - *mock.Call -} - -// Shutdown is a helper method to define mock.On call -// - _a0 context.Context -func (_e *MsgReceiver_Expecter) Shutdown(_a0 interface{}) *MsgReceiver_Shutdown_Call { - return &MsgReceiver_Shutdown_Call{Call: _e.mock.On("Shutdown", _a0)} -} - -func (_c *MsgReceiver_Shutdown_Call) Run(run func(_a0 context.Context)) *MsgReceiver_Shutdown_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MsgReceiver_Shutdown_Call) Return(_a0 error) *MsgReceiver_Shutdown_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgReceiver_Shutdown_Call) RunAndReturn(run func(context.Context) error) *MsgReceiver_Shutdown_Call { - _c.Call.Return(run) - return _c -} - -// String provides a mock function with given fields: +// String provides a mock function with no fields func (_m *MsgReceiver) String() string { ret := _m.Called() @@ -177,33 +87,6 @@ func (_m *MsgReceiver) String() string { return r0 } -// MsgReceiver_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' -type MsgReceiver_String_Call struct { - *mock.Call -} - -// String is a helper method to define mock.On call -func (_e *MsgReceiver_Expecter) String() *MsgReceiver_String_Call { - return &MsgReceiver_String_Call{Call: _e.mock.On("String")} -} - -func (_c *MsgReceiver_String_Call) Run(run func()) *MsgReceiver_String_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgReceiver_String_Call) Return(_a0 string) *MsgReceiver_String_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgReceiver_String_Call) RunAndReturn(run func() string) *MsgReceiver_String_Call { - _c.Call.Return(run) - return _c -} - // NewMsgReceiver creates a new instance of MsgReceiver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMsgReceiver(t interface { diff --git a/azbus/mocks/MsgSender.go b/azbus/mocks/MsgSender.go index 0c0909e..8d0d765 100644 --- a/azbus/mocks/MsgSender.go +++ b/azbus/mocks/MsgSender.go @@ -1,4 +1,4 @@ -// Code generated by mockery. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -16,12 +16,22 @@ type MsgSender struct { mock.Mock } -type MsgSender_Expecter struct { - mock *mock.Mock -} +// BatchAddMessage provides a mock function with given fields: batch, m, options +func (_m *MsgSender) BatchAddMessage(batch *azservicebus.MessageBatch, m *azservicebus.Message, options *azservicebus.AddMessageOptions) error { + ret := _m.Called(batch, m, options) + + if len(ret) == 0 { + panic("no return value specified for BatchAddMessage") + } + + var r0 error + if rf, ok := ret.Get(0).(func(*azservicebus.MessageBatch, *azservicebus.Message, *azservicebus.AddMessageOptions) error); ok { + r0 = rf(batch, m, options) + } else { + r0 = ret.Error(0) + } -func (_m *MsgSender) EXPECT() *MsgSender_Expecter { - return &MsgSender_Expecter{mock: &_m.Mock} + return r0 } // Close provides a mock function with given fields: _a0 @@ -29,35 +39,7 @@ func (_m *MsgSender) Close(_a0 context.Context) { _m.Called(_a0) } -// MsgSender_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type MsgSender_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -// - _a0 context.Context -func (_e *MsgSender_Expecter) Close(_a0 interface{}) *MsgSender_Close_Call { - return &MsgSender_Close_Call{Call: _e.mock.On("Close", _a0)} -} - -func (_c *MsgSender_Close_Call) Run(run func(_a0 context.Context)) *MsgSender_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MsgSender_Close_Call) Return() *MsgSender_Close_Call { - _c.Call.Return() - return _c -} - -func (_c *MsgSender_Close_Call) RunAndReturn(run func(context.Context)) *MsgSender_Close_Call { - _c.Call.Return(run) - return _c -} - -// GetAZClient provides a mock function with given fields: +// GetAZClient provides a mock function with no fields func (_m *MsgSender) GetAZClient() azbus.AZClient { ret := _m.Called() @@ -75,34 +57,37 @@ func (_m *MsgSender) GetAZClient() azbus.AZClient { return r0 } -// MsgSender_GetAZClient_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAZClient' -type MsgSender_GetAZClient_Call struct { - *mock.Call -} +// NewMessageBatch provides a mock function with given fields: _a0 +func (_m *MsgSender) NewMessageBatch(_a0 context.Context) (*azservicebus.MessageBatch, error) { + ret := _m.Called(_a0) -// GetAZClient is a helper method to define mock.On call -func (_e *MsgSender_Expecter) GetAZClient() *MsgSender_GetAZClient_Call { - return &MsgSender_GetAZClient_Call{Call: _e.mock.On("GetAZClient")} -} + if len(ret) == 0 { + panic("no return value specified for NewMessageBatch") + } -func (_c *MsgSender_GetAZClient_Call) Run(run func()) *MsgSender_GetAZClient_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} + var r0 *azservicebus.MessageBatch + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (*azservicebus.MessageBatch, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(context.Context) *azservicebus.MessageBatch); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*azservicebus.MessageBatch) + } + } -func (_c *MsgSender_GetAZClient_Call) Return(_a0 azbus.AZClient) *MsgSender_GetAZClient_Call { - _c.Call.Return(_a0) - return _c -} + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } -func (_c *MsgSender_GetAZClient_Call) RunAndReturn(run func() azbus.AZClient) *MsgSender_GetAZClient_Call { - _c.Call.Return(run) - return _c + return r0, r1 } -// Open provides a mock function with given fields: +// Open provides a mock function with no fields func (_m *MsgSender) Open() error { ret := _m.Called() @@ -120,33 +105,6 @@ func (_m *MsgSender) Open() error { return r0 } -// MsgSender_Open_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Open' -type MsgSender_Open_Call struct { - *mock.Call -} - -// Open is a helper method to define mock.On call -func (_e *MsgSender_Expecter) Open() *MsgSender_Open_Call { - return &MsgSender_Open_Call{Call: _e.mock.On("Open")} -} - -func (_c *MsgSender_Open_Call) Run(run func()) *MsgSender_Open_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgSender_Open_Call) Return(_a0 error) *MsgSender_Open_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgSender_Open_Call) RunAndReturn(run func() error) *MsgSender_Open_Call { - _c.Call.Return(run) - return _c -} - // Send provides a mock function with given fields: _a0, _a1 func (_m *MsgSender) Send(_a0 context.Context, _a1 *azservicebus.Message) error { ret := _m.Called(_a0, _a1) @@ -165,36 +123,25 @@ func (_m *MsgSender) Send(_a0 context.Context, _a1 *azservicebus.Message) error return r0 } -// MsgSender_Send_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Send' -type MsgSender_Send_Call struct { - *mock.Call -} - -// Send is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *azservicebus.Message -func (_e *MsgSender_Expecter) Send(_a0 interface{}, _a1 interface{}) *MsgSender_Send_Call { - return &MsgSender_Send_Call{Call: _e.mock.On("Send", _a0, _a1)} -} +// SendBatch provides a mock function with given fields: _a0, _a1 +func (_m *MsgSender) SendBatch(_a0 context.Context, _a1 *azservicebus.MessageBatch) error { + ret := _m.Called(_a0, _a1) -func (_c *MsgSender_Send_Call) Run(run func(_a0 context.Context, _a1 *azservicebus.Message)) *MsgSender_Send_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*azservicebus.Message)) - }) - return _c -} + if len(ret) == 0 { + panic("no return value specified for SendBatch") + } -func (_c *MsgSender_Send_Call) Return(_a0 error) *MsgSender_Send_Call { - _c.Call.Return(_a0) - return _c -} + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *azservicebus.MessageBatch) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } -func (_c *MsgSender_Send_Call) RunAndReturn(run func(context.Context, *azservicebus.Message) error) *MsgSender_Send_Call { - _c.Call.Return(run) - return _c + return r0 } -// String provides a mock function with given fields: +// String provides a mock function with no fields func (_m *MsgSender) String() string { ret := _m.Called() @@ -212,33 +159,6 @@ func (_m *MsgSender) String() string { return r0 } -// MsgSender_String_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'String' -type MsgSender_String_Call struct { - *mock.Call -} - -// String is a helper method to define mock.On call -func (_e *MsgSender_Expecter) String() *MsgSender_String_Call { - return &MsgSender_String_Call{Call: _e.mock.On("String")} -} - -func (_c *MsgSender_String_Call) Run(run func()) *MsgSender_String_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MsgSender_String_Call) Return(_a0 string) *MsgSender_String_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MsgSender_String_Call) RunAndReturn(run func() string) *MsgSender_String_Call { - _c.Call.Return(run) - return _c -} - // NewMsgSender creates a new instance of MsgSender. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMsgSender(t interface { diff --git a/azbus/mocks/ReceiverOption.go b/azbus/mocks/ReceiverOption.go index b228158..c5bdf33 100644 --- a/azbus/mocks/ReceiverOption.go +++ b/azbus/mocks/ReceiverOption.go @@ -1,4 +1,4 @@ -// Code generated by mockery. DO NOT EDIT. +// Code generated by mockery v2.50.0. DO NOT EDIT. package mocks @@ -12,47 +12,11 @@ type ReceiverOption struct { mock.Mock } -type ReceiverOption_Expecter struct { - mock *mock.Mock -} - -func (_m *ReceiverOption) EXPECT() *ReceiverOption_Expecter { - return &ReceiverOption_Expecter{mock: &_m.Mock} -} - // Execute provides a mock function with given fields: _a0 func (_m *ReceiverOption) Execute(_a0 *azbus.Receiver) { _m.Called(_a0) } -// ReceiverOption_Execute_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Execute' -type ReceiverOption_Execute_Call struct { - *mock.Call -} - -// Execute is a helper method to define mock.On call -// - _a0 *azbus.Receiver -func (_e *ReceiverOption_Expecter) Execute(_a0 interface{}) *ReceiverOption_Execute_Call { - return &ReceiverOption_Execute_Call{Call: _e.mock.On("Execute", _a0)} -} - -func (_c *ReceiverOption_Execute_Call) Run(run func(_a0 *azbus.Receiver)) *ReceiverOption_Execute_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(*azbus.Receiver)) - }) - return _c -} - -func (_c *ReceiverOption_Execute_Call) Return() *ReceiverOption_Execute_Call { - _c.Call.Return() - return _c -} - -func (_c *ReceiverOption_Execute_Call) RunAndReturn(run func(*azbus.Receiver)) *ReceiverOption_Execute_Call { - _c.Call.Return(run) - return _c -} - // NewReceiverOption creates a new instance of ReceiverOption. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewReceiverOption(t interface { diff --git a/azbus/msgsender.go b/azbus/msgsender.go index 953b7f7..e7223d0 100644 --- a/azbus/msgsender.go +++ b/azbus/msgsender.go @@ -2,6 +2,8 @@ package azbus import ( "context" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) type MsgSender interface { @@ -9,5 +11,9 @@ type MsgSender interface { Close(context.Context) Send(context.Context, *OutMessage) error + NewMessageBatch(context.Context) (*OutMessageBatch, error) + BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error + + SendBatch(context.Context, *OutMessageBatch) error String() string } diff --git a/azbus/outmessagebatch.go b/azbus/outmessagebatch.go new file mode 100644 index 0000000..dae0e23 --- /dev/null +++ b/azbus/outmessagebatch.go @@ -0,0 +1,8 @@ +package azbus + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +// OutMessageBatch aliases the azure service bus batch message type. +type OutMessageBatch = azservicebus.MessageBatch diff --git a/azbus/receiver.go b/azbus/receiver.go index 0e7602d..d963c0b 100644 --- a/azbus/receiver.go +++ b/azbus/receiver.go @@ -15,6 +15,7 @@ var ( ) // Handler processes a ReceivedMessage. +// Use this style of handler to take advantage of the automatic peek lock renewal and disposal of messages. type Handler interface { Handle(context.Context, *ReceivedMessage) (Disposition, context.Context, error) Open() error @@ -22,7 +23,11 @@ type Handler interface { } const ( - // RenewalTime is the how often we want to renew the message PEEK lock + // DefaultRenewalTime is the how often we want to renew the message PEEK lock + // If RenewMessageLock is true then this is the default value for RenewMessageTime. + // + // Note that the default aligns with the default value for topics and queues in Azure Service Bus. + // Unless the topic or queue has been configured with a different value, you should not need to change this. // // Inspection of the topics and subscription shows that the PeekLock timeout is one minute. // @@ -39,7 +44,7 @@ const ( // normal test suites. // // Set to 50 seconds, well within the 60 seconds peek lock timeout - RenewalTime = 50 * time.Second + DefaultRenewalTime = 50 * time.Second ) // Settings for Receivers: @@ -69,7 +74,10 @@ type ReceiverConfig struct { SubscriptionName string // See azbus/receiver.go + // Note: RenewMessageLock has no effect when using the batched handler (BatchSize > 0) RenewMessageLock bool + + // RenewMessageTime is the how often we want to renew the message PEEK lock RenewMessageTime time.Duration // If a deadletter receiver then this is true @@ -93,6 +101,8 @@ type Receiver struct { type ReceiverOption func(*Receiver) // WithHandlers +// Add's individual message handlers to the receiver. +// Mutually exclusive with WithBatchHandler. func WithHandlers(h ...Handler) ReceiverOption { return func(r *Receiver) { r.handlers = append(r.handlers, h...) @@ -138,7 +148,7 @@ func newReceiver(r *Receiver, log Logger, cfg ReceiverConfig, opts ...ReceiverOp // Set this to a default that corresponds to the az servicebus default peek-lock timeout if r.Cfg.RenewMessageTime == 0 { - r.Cfg.RenewMessageTime = RenewalTime + r.Cfg.RenewMessageTime = DefaultRenewalTime } return r diff --git a/azbus/sender.go b/azbus/sender.go index 765bc61..5ceae87 100644 --- a/azbus/sender.go +++ b/azbus/sender.go @@ -150,3 +150,56 @@ func (s *Sender) Send(ctx context.Context, message *OutMessage) error { log.Debugf("Sending message id %s took %s", id, time.Since(now)) return nil } + +func (s *Sender) NewMessageBatch(ctx context.Context) (*OutMessageBatch, error) { + return s.sender.NewMessageBatch(ctx, nil) +} + +// BatchAddMessage calls Addmessage on batch +// Note: this method is a direct pass through and exists only to provide a +// mockable interface for adding messages to a batch. +func (s *Sender) BatchAddMessage(batch *OutMessageBatch, m *OutMessage, options *azservicebus.AddMessageOptions) error { + return batch.AddMessage(m, options) +} + +// SendBatch submits a message batch to the broker. Ignores cancellation. +func (s *Sender) SendBatch(ctx context.Context, batch *OutMessageBatch) error { + + // Without this fix eventsourcepoller and similar services repeatedly context cancel and repeatedly + // restart. + ctx = context.WithoutCancel(ctx) + + var err error + + now := time.Now() + + span, ctx := tracing.StartSpanFromContext(ctx, "Sender.SendBatch") + defer span.Finish() + span.LogFields( + otlog.String("sender", s.Cfg.TopicOrQueueName), + ) + + // Get the logging context after we create the span as that may have created a new + // trace and stashed the traceid in the metadata. + log := s.log.FromContext(ctx) + defer log.Close() + + // boots & braces + if s.sender == nil { + err = s.Open() + if err != nil { + return err + } + } + // Note: sizing must be dealt with as the batch is created and accumulated. + + // Note: the first message properties (including application properties) are established by the first message in the batch + + err = s.sender.SendMessageBatch(ctx, batch, nil) + if err != nil { + azerr := fmt.Errorf("SendMessageBatch failed in %s: %w", time.Since(now), NewAzbusError(err)) + log.Infof("%s", azerr) + return azerr + } + return nil +} diff --git a/azbus/tracing.go b/azbus/tracing.go index 87c572c..d0a371e 100644 --- a/azbus/tracing.go +++ b/azbus/tracing.go @@ -6,7 +6,7 @@ import ( opentracing "github.com/opentracing/opentracing-go" ) -func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) { +func (r *Receiver) CreateReceivedMessageTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (context.Context, opentracing.Span) { // We don't have the tracing span info on the context yet, that is what this function will add // we we log using the reciever logger r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties) @@ -26,13 +26,18 @@ func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, } spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier) if err != nil { - r.log.Infof("handleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err) + r.log.Infof("CreateReceivedMessageWithTracingContext(): Unable to extract span context: %v", err) } else { opts = append(opts, opentracing.ChildOf(spanCtx)) } span := opentracing.StartSpan("handle message", opts...) - defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) + return ctx, span +} + +func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) { + ctx, span := r.CreateReceivedMessageTracingContext(ctx, message, handler) + defer span.Finish() return handler.Handle(ctx, message) }