Skip to content

Commit 98cd978

Browse files
committed
feat: improve RabbitMQ connection resilience and error handling
- Add a reconnect configuration with retry and exponential backoff for RabbitMQ connections - Replace direct connection attempts with a retry-enabled dial function in worker initialization - Improve shutdown logic to handle errors from both consumer cancel and channel/connection close, aggregating errors if needed - Enhance task acknowledgment to log errors and requeue tasks if ack fails Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 1488435 commit 98cd978

File tree

1 file changed

+60
-6
lines changed

1 file changed

+60
-6
lines changed

rabbitmq.go

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package rabbitmq
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
7+
"math"
68
"sync"
79
"sync/atomic"
810
"time"
@@ -16,6 +18,30 @@ import (
1618

1719
var _ core.Worker = (*Worker)(nil)
1820

21+
// ReconnectConfig defines the retry policy for RabbitMQ connection.
22+
type ReconnectConfig struct {
23+
MaxRetries int
24+
InitialDelay time.Duration
25+
MaxDelay time.Duration
26+
}
27+
28+
// dialWithRetry tries to connect to RabbitMQ with retry and backoff.
29+
func dialWithRetry(addr string, cfg ReconnectConfig) (*amqp.Connection, error) {
30+
var conn *amqp.Connection
31+
var err error
32+
delay := cfg.InitialDelay
33+
for i := 0; i < cfg.MaxRetries; i++ {
34+
conn, err = amqp.Dial(addr)
35+
if err == nil {
36+
return conn, nil
37+
}
38+
time.Sleep(delay)
39+
// Exponential backoff with cap
40+
delay = time.Duration(math.Min(float64(cfg.MaxDelay), float64(delay)*2))
41+
}
42+
return nil, errors.New("failed to connect to RabbitMQ after retries: " + err.Error())
43+
}
44+
1945
/*
2046
Worker struct implements the core.Worker interface for RabbitMQ.
2147
It manages the AMQP connection, channel, and task consumption.
@@ -59,7 +85,13 @@ func NewWorker(opts ...Option) *Worker {
5985
tasks: make(chan amqp.Delivery),
6086
}
6187

62-
w.conn, err = amqp.Dial(w.opts.addr)
88+
// Use retry config, fallback to default if not set
89+
retryCfg := ReconnectConfig{
90+
MaxRetries: 5,
91+
InitialDelay: 500 * time.Millisecond,
92+
MaxDelay: 5 * time.Second,
93+
}
94+
w.conn, err = dialWithRetry(w.opts.addr, retryCfg)
6395
if err != nil {
6496
w.opts.logger.Fatal("can't connect rabbitmq: ", err)
6597
}
@@ -164,11 +196,30 @@ func (w *Worker) Shutdown() (err error) {
164196

165197
w.stopOnce.Do(func() {
166198
close(w.stop)
167-
if err = w.channel.Cancel(w.opts.tag, true); err != nil {
168-
w.opts.logger.Error("consumer cancel failed: ", err)
199+
// Cancel consumer first
200+
if w.channel != nil {
201+
if cerr := w.channel.Cancel(w.opts.tag, true); cerr != nil {
202+
w.opts.logger.Error("consumer cancel failed: ", cerr)
203+
if err == nil {
204+
err = cerr
205+
}
206+
}
207+
// Try to close channel
208+
if cerr := w.channel.Close(); cerr != nil {
209+
w.opts.logger.Error("AMQP channel close error: ", cerr)
210+
if err == nil {
211+
err = cerr
212+
}
213+
}
169214
}
170-
if err = w.conn.Close(); err != nil {
171-
w.opts.logger.Error("AMQP connection close error: ", err)
215+
// Then close connection
216+
if w.conn != nil {
217+
if cerr := w.conn.Close(); cerr != nil {
218+
w.opts.logger.Error("AMQP connection close error: ", cerr)
219+
if err == nil {
220+
err = cerr
221+
}
222+
}
172223
}
173224
})
174225

@@ -232,7 +283,10 @@ loop:
232283
var data job.Message
233284
_ = json.Unmarshal(task.Body, &data)
234285
if !w.opts.autoAck {
235-
_ = task.Ack(w.opts.autoAck)
286+
if err := task.Ack(false); err != nil {
287+
w.opts.logger.Error("Ack failed: ", err)
288+
_ = task.Nack(false, true) // requeue
289+
}
236290
}
237291
return &data, nil
238292
case <-time.After(1 * time.Second):

0 commit comments

Comments
 (0)