Skip to content

Commit 3a8d8f1

Browse files
committed
Receive Messages Serially
New functional option to add a serialHandler that reads N messages at once and process them one at a time - serially. receiver := NewReceiver(, WithSerialHandler(h, 200)) where h is an instance of Handler. Defining a SerialHandler this way disables the normal parallel processing defined by WithHandlers(). It is higly recommended to specify RenewMessageLock if the number of received messages is high (say > 10). One has to be sure that processing the number of messages within 60s is achievable. AB#9378
1 parent dd2783b commit 3a8d8f1

File tree

3 files changed

+108
-34
lines changed

3 files changed

+108
-34
lines changed

azbus/disposition.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,31 @@ const (
2121
)
2222

2323
func (d Disposition) String() string {
24-
switch {
25-
case d == DeadletterDisposition:
24+
switch d {
25+
case DeadletterDisposition:
2626
return "DeadLetter"
27-
case d == AbandonDisposition:
27+
case AbandonDisposition:
2828
return "Abandon"
29-
case d == RescheduleDisposition:
29+
case RescheduleDisposition:
3030
return "Reschedule"
31-
case d == CompleteDisposition:
31+
case CompleteDisposition:
3232
return "Complete"
3333
}
3434
return fmt.Sprintf("Unknown%d", d)
3535
}
3636

37-
func (r *Receiver) dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) {
38-
switch {
39-
case d == DeadletterDisposition:
37+
func (r *Receiver) Dispose(ctx context.Context, d Disposition, err error, msg *ReceivedMessage) {
38+
switch d {
39+
case DeadletterDisposition:
4040
r.deadLetter(ctx, err, msg)
4141
return
42-
case d == AbandonDisposition:
42+
case AbandonDisposition:
4343
r.abandon(ctx, err, msg)
4444
return
45-
case d == RescheduleDisposition:
45+
case RescheduleDisposition:
4646
r.reschedule(ctx, err, msg)
4747
return
48-
case d == CompleteDisposition:
48+
case CompleteDisposition:
4949
r.complete(ctx, err, msg)
5050
return
5151
}

azbus/receiver.go

+92-19
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,13 @@ type Receiver struct {
8282

8383
Cfg ReceiverConfig
8484

85-
log Logger
86-
mtx sync.Mutex
87-
receiver *azservicebus.Receiver
88-
options *azservicebus.ReceiverOptions
89-
handlers []Handler
85+
log Logger
86+
mtx sync.Mutex
87+
receiver *azservicebus.Receiver
88+
options *azservicebus.ReceiverOptions
89+
handlers []Handler
90+
serialHandler Handler
91+
numberOfReceivedMessages int // for serial Handler only
9092
}
9193

9294
type ReceiverOption func(*Receiver)
@@ -98,6 +100,14 @@ func WithHandlers(h ...Handler) ReceiverOption {
98100
}
99101
}
100102

103+
// WithSerialHandler
104+
func WithSerialHandler(h Handler, n int) ReceiverOption {
105+
return func(r *Receiver) {
106+
r.serialHandler = h
107+
r.numberOfReceivedMessages = n
108+
}
109+
}
110+
101111
// WithRenewalTime takes an optional time to renew the peek lock. This should be comfortably less
102112
// than the peek lock timeout. For example: the default peek lock timeout is 60s and the default
103113
// renewal time is 50s.
@@ -172,7 +182,7 @@ func (r *Receiver) processMessage(ctx context.Context, count int, maxDuration ti
172182

173183
r.log.Debugf("Processing message %d", count)
174184
disp, ctx, err := r.handleReceivedMessageWithTracingContext(ctx, msg, handler)
175-
r.dispose(ctx, disp, err, msg)
185+
r.Dispose(ctx, disp, err, msg)
176186

177187
duration := time.Since(now)
178188

@@ -226,7 +236,7 @@ func (r *Receiver) renewMessageLock(ctx context.Context, count int, msg *Receive
226236
}
227237
}
228238

229-
func (r *Receiver) receiveMessages() error {
239+
func (r *Receiver) receiveMessagesInParallel() error {
230240

231241
numberOfReceivedMessages := len(r.handlers)
232242
r.log.Debugf(
@@ -294,6 +304,55 @@ func (r *Receiver) receiveMessages() error {
294304
}
295305
}
296306

307+
func (r *Receiver) receiveMessagesInSerial() error {
308+
309+
r.log.Debugf(
310+
"NumberOfReceivedMessages %d, RenewMessageLock: %v",
311+
r.numberOfReceivedMessages,
312+
r.Cfg.RenewMessageLock,
313+
)
314+
315+
ctx, cancel := context.WithCancel(context.Background())
316+
defer cancel()
317+
for {
318+
var err error
319+
var messages []*ReceivedMessage
320+
messages, err = r.receiver.ReceiveMessages(ctx, r.numberOfReceivedMessages, nil)
321+
if err != nil {
322+
azerr := fmt.Errorf("%s: ReceiveMessage failure: %w", r, NewAzbusError(err))
323+
r.log.Infof("%s", azerr)
324+
return azerr
325+
}
326+
total := len(messages)
327+
r.log.Debugf("total messages %d", total)
328+
var renewCtx context.Context
329+
var renewCancel context.CancelFunc
330+
var maxDuration time.Duration
331+
// XXX: if the number of Received Messages is large (>10) then RenewMessageLock is required.
332+
if r.Cfg.RenewMessageLock {
333+
func() {
334+
renewCtx, renewCancel = context.WithCancel(ctx)
335+
defer renewCancel()
336+
for i, msg := range messages {
337+
go r.renewMessageLock(renewCtx, i+1, msg)
338+
}
339+
for i, msg := range messages {
340+
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
341+
}
342+
}()
343+
} else {
344+
for i, msg := range messages {
345+
func() {
346+
// we need a timeout per message if RenewMessageLock is disabled
347+
renewCtx, renewCancel, maxDuration = r.setTimeout(ctx, r.log, msg)
348+
defer renewCancel()
349+
r.processMessage(renewCtx, i+1, maxDuration, msg, r.serialHandler)
350+
}()
351+
}
352+
}
353+
}
354+
}
355+
297356
// The following 2 methods satisfy the startup.Listener interface.
298357
func (r *Receiver) Listen() error {
299358
r.log.Debugf("listen")
@@ -303,21 +362,18 @@ func (r *Receiver) Listen() error {
303362
r.log.Infof("%s", azerr)
304363
return azerr
305364
}
306-
return r.receiveMessages()
365+
if r.serialHandler != nil {
366+
return r.receiveMessagesInSerial()
367+
}
368+
return r.receiveMessagesInParallel()
307369
}
308370

309371
func (r *Receiver) Shutdown(ctx context.Context) error {
310372
r.close_()
311373
return nil
312374
}
313375

314-
func (r *Receiver) open() error {
315-
var err error
316-
317-
if r.receiver != nil {
318-
return nil
319-
}
320-
376+
func (r *Receiver) openReceiver() error {
321377
client, err := r.azClient.azClient()
322378
if err != nil {
323379
return err
@@ -336,10 +392,27 @@ func (r *Receiver) open() error {
336392
}
337393

338394
r.receiver = receiver
339-
for j := range len(r.handlers) {
340-
err = r.handlers[j].Open()
341-
if err != nil {
342-
return fmt.Errorf("failed to open handler: %w", err)
395+
return nil
396+
}
397+
398+
func (r *Receiver) open() error {
399+
var err error
400+
401+
if r.receiver != nil {
402+
return nil
403+
}
404+
405+
err = r.openReceiver()
406+
if err != nil {
407+
return err
408+
}
409+
410+
if r.serialHandler == nil {
411+
for j := range len(r.handlers) {
412+
err = r.handlers[j].Open()
413+
if err != nil {
414+
return fmt.Errorf("failed to open handler: %w", err)
415+
}
343416
}
344417
}
345418
return nil

taskfiles/Taskfile_codeqa.yml

+5-4
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,18 @@ tasks:
2727
desc: Quality assurance of code
2828
summary: "format sources (go fmt)"
2929
cmds:
30-
- gofmt -l -s -w .
30+
- |
31+
go fix ./...
32+
goimports -w .
33+
gofmt -l -s -w .
3134
3235
lint:
3336
desc: Quality assurance of code
3437
cmds:
3538
- |
36-
golangci-lint --version
3739
go vet ./...
38-
goimports {{.VERBOSE}} -w .
40+
golangci-lint --version
3941
golangci-lint {{.VERBOSE}} run --timeout 10m ./...
40-
gofmt -l -s -w .
4142
4243
unit-tests:
4344
desc: "run unit tests"

0 commit comments

Comments
 (0)