Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions azbus/disposition.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *R
}
}

// NB: ALL disposition methods return nil so they can be used in return statements

// Abandon abandons message. This function is not used but is present for consistency.
func (r *Receiver) abandon(ctx context.Context, err error, msg *ReceivedMessage) {
ctx = context.WithoutCancel(ctx)
Expand Down
197 changes: 185 additions & 12 deletions azbus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ type Handler interface {
Close()
}

// BatchHandler processes ReceivedMessages.
type BatchHandler interface {
Handle(context.Context, []*ReceivedMessage) ([]Disposition, context.Context, error)
Open() error
Close()
}

const (
// RenewalTime is the how often we want to renew the message PEEK lock
//
Expand Down Expand Up @@ -82,11 +89,14 @@ type Receiver struct {

Cfg ReceiverConfig

log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
log Logger
mtx sync.Mutex
receiver *azservicebus.Receiver
options *azservicebus.ReceiverOptions
handlers []Handler
serialHandler Handler
batchHandler BatchHandler
numberOfReceivedMessages int // for serial or Batch Handler only
}

type ReceiverOption func(*Receiver)
Expand All @@ -98,6 +108,22 @@ func WithHandlers(h ...Handler) ReceiverOption {
}
}

// WithBatchHandler
func WithBatchHandler(h BatchHandler, n int) ReceiverOption {
return func(r *Receiver) {
r.batchHandler = h
r.numberOfReceivedMessages = n
}
}

// WithSerialHandler
func WithSerialHandler(h Handler, n int) ReceiverOption {
return func(r *Receiver) {
r.serialHandler = h
r.numberOfReceivedMessages = n
}
}

// WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less
// than the peek lock timeout. For example: the default peek lock timeout is 60s and the default
// renewal time is 50s.
Expand Down Expand Up @@ -165,6 +191,34 @@ func (r *Receiver) String() string {
}

// processMessage disposes of messages and emits 2 log messages detailing how long processing took.
func (r *Receiver) processMessages(ctx context.Context, maxDuration time.Duration, messages []*ReceivedMessage, handler BatchHandler) {
count := len(messages)
now := time.Now()
dispositions, ctx, err := r.handleReceivedMessagesWithTracingContext(ctx, messages, handler)
for i, disp := range dispositions {
r.dispose(ctx, disp, err, messages[i])
}
duration := time.Since(now)

// Now we do have a tracing context we can use it for logging
log := r.log.FromContext(ctx)
defer log.Close()

log.Debugf("Processing messages %d took %s", len(messages), duration)

// This is safe because maxDuration is only defined if RenewMessageLock is false.
if !r.Cfg.RenewMessageLock && duration >= maxDuration {
log.Infof("WARNING: processing msg %d duration %v took more than %v seconds", count, duration, maxDuration)
log.Infof("WARNING: please either enable SERVICEBUS_RENEW_LOCK or reduce SERVICEBUS_INCOMING_MESSAGES")
log.Infof("WARNING: both can be found in the helm chart for each service.")
}
if errors.Is(err, ErrPeekLockTimeout) {
log.Infof("WARNING: processing msg %d duration %s returned error: %v", count, duration, err)
log.Infof("WARNING: please enable SERVICEBUS_RENEW_LOCK which can be found in the helm chart")
}
}

// processMessage disposes of message and emits 2 log messages detailing how long processing took.
func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration time.Duration, msg *ReceivedMessage, handler Handler) {
now := time.Now()

Expand Down Expand Up @@ -226,7 +280,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
}
}

func (r *Receiver) receiveMessages() error {
func (r *Receiver) receiveMessagesInParallel() error {

numberOfReceivedMessages := len(r.handlers)
r.log.Debugf(
Expand Down Expand Up @@ -294,6 +348,98 @@ func (r *Receiver) receiveMessages() error {
}
}

func (r *Receiver) receiveMessagesInSerial() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
var renewCtx context.Context
var renewCancel context.CancelFunc
var maxDuration time.Duration
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
if r.Cfg.RenewMessageLock {
func() {
renewCtx, renewCancel = context.WithCancel(ctx)
defer renewCancel()
for i, msg := range messages {
go r.renewMessageLock(renewCtx, i+1, msg)
}
for i, msg := range messages {
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
}
}()
} else {
for i, msg := range messages {
func() {
// we need a timeout per message if RenewMessageLock is disabled
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg)
defer renewCancel()
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work for the forestrie use case. The handling of the whole batch needs to be in the application handler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Handler recives a slice of messages which it can process as it sees fit so the logic is in the application handler surely...

}()
}
}
}
}

func (r *Receiver) receiveMessagesInBatch() error {

r.log.Debugf(
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
r.numberOfReceivedMessages,
r.Cfg.RenewMessageLock,
)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
var err error
var messages []*ReceivedMessage
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
if err != nil {
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
return azerr
}
total := len(messages)
r.log.Debugf("total messages %d", total)
var renewCtx context.Context
var renewCancel context.CancelFunc
var maxDuration time.Duration
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
func() {
if r.Cfg.RenewMessageLock {
renewCtx, renewCancel = context.WithCancel(ctx)
defer renewCancel()
for i, msg := range messages {
go r.renewMessageLock(renewCtx, i+1, msg)
}
} else {
// we need a timeout per message if RenewMessageLock is disabled - use first message
// for all messages
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, messages[0])
defer renewCancel()
}
r.processMessages(renewCtx, maxDuration, messages, r.batchHandler)
}()
}
}

// The following 2 methods satisfy the startup.Listener interface.
func (r *Receiver) Listen() error {
r.log.Debugf("listen")
Expand All @@ -303,7 +449,13 @@ func (r *Receiver) Listen() error {
r.log.Infof("%s", azerr)
return azerr
}
return r.receiveMessages()
if r.batchHandler != nil {
return r.receiveMessagesInBatch()
}
if r.serialHandler != nil {
return r.receiveMessagesInSerial()
}
return r.receiveMessagesInParallel()
}

func (r *Receiver) Shutdown(ctx context.Context) error {
Expand Down Expand Up @@ -334,12 +486,25 @@ func (r *Receiver) open() error {
r.log.Infof("%s", azerr)
return azerr
}

r.receiver = receiver
for j := range len(r.handlers) {
err = r.handlers[j].Open()

switch {
case r.batchHandler != nil:
err = r.batchHandler.Open()
if err != nil {
return fmt.Errorf("failed to open batch handler: %w", err)
}
case r.serialHandler != nil:
err = r.serialHandler.Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
return fmt.Errorf("failed to open serial handler: %w", err)
}
default:
for j := range len(r.handlers) {
err = r.handlers[j].Open()
if err != nil {
return fmt.Errorf("failed to open handler: %w", err)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just return nil in this if statement at the end and get rid of the else?

}
return nil
Expand All @@ -356,11 +521,19 @@ func (r *Receiver) close_() {
azerr := fmt.Errorf("%s: Error closing receiver: %w", r, NewAzbusError(err))
r.log.Infof("%s", azerr)
}
r.receiver = nil
for j := range len(r.handlers) {
r.handlers[j].Close()
}
r.handlers = []Handler{}
if r.serialHandler != nil {
r.serialHandler.Close()
r.serialHandler = nil
}
if r.batchHandler != nil {
r.batchHandler.Close()
r.batchHandler = nil
}
r.receiver = nil
}
}
}
36 changes: 35 additions & 1 deletion azbus/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context, message *ReceivedMessage, handler Handler) (Disposition, context.Context, error) {
// We don't have the tracing span info on the context yet, that is what this function will add
// we we log using the reciever logger
// we log using the receiver logger
r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", message.ApplicationProperties)

var opts = []opentracing.StartSpanOption{}
Expand Down Expand Up @@ -36,6 +36,40 @@ func (r *Receiver) handleReceivedMessageWithTracingContext(ctx context.Context,
return handler.Handle(ctx, message)
}

func (r *Receiver) handleReceivedMessagesWithTracingContext(ctx context.Context, messages []*ReceivedMessage, handler BatchHandler) ([]Disposition, context.Context, error) {

var opts = []opentracing.StartSpanOption{}
carrier := opentracing.TextMapCarrier{}
// This just gets all the message Application Properties into a string map. That map is then passed into the
// open tracing constructor which extracts any bits it is interested in to use to setup the spans etc.
// It will ignore anything it doesn't care about. So the filtering of the map is done for us and
// we don't need to pre-filter it.

// We don't have the tracing span info on the context yet, that is what this function will add
// we log using the receiver logger
// only use first message for tracing..... will fix later
msg := messages[0]
r.log.Debugf("ContextFromReceivedMessage(): ApplicationProperties %v", msg.ApplicationProperties)

for k, v := range msg.ApplicationProperties {
// Tracing properties will be strings
value, ok := v.(string)
if ok {
carrier.Set(k, value)
}
}
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, carrier)
if err != nil {
r.log.Infof("handleReceivedMessageWithTracingContext(): Unable to extract span context: %v", err)
} else {
opts = append(opts, opentracing.ChildOf(spanCtx))
}
span := opentracing.StartSpan("handle message", opts...)
defer span.Finish()
ctx = opentracing.ContextWithSpan(ctx, span)
return handler.Handle(ctx, messages)
}

func (s *Sender) updateSendingMesssageForSpan(ctx context.Context, message *OutMessage, span opentracing.Span) {
log := s.log.FromContext(ctx)
defer log.Close()
Expand Down
9 changes: 5 additions & 4 deletions taskfiles/Taskfile_codeqa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ tasks:
desc: Quality assurance of code
summary: "format sources (go fmt)"
cmds:
- gofmt -l -s -w .
- |
go fix ./...
goimports -w .
gofmt -l -s -w .

lint:
desc: Quality assurance of code
cmds:
- |
golangci-lint --version
go vet ./...
goimports {{.VERBOSE}} -w .
golangci-lint --version
golangci-lint {{.VERBOSE}} run --timeout 10m ./...
gofmt -l -s -w .

unit-tests:
desc: "run unit tests"
Expand Down