Skip to content

Commit aa1894e

Browse files
authored
Merge pull request #26 from furan917/fix/ensure-system-interupt-works
Small fix for graceful interrupts + some whitespacing fixes
2 parents 544767e + 229a649 commit aa1894e

File tree

3 files changed

+149
-141
lines changed

3 files changed

+149
-141
lines changed

cmd/listen.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@ var ListenCmd = &cobra.Command{
3939
return fmt.Errorf("error creating listener: %s", err)
4040
}
4141

42-
// Create a channel to handle program termination or interruption signals so we can kill any connections if needed
42+
//Create a channel to handle program termination or interruption signals so we can kill any connections if needed
4343
sigChan := make(chan os.Signal, 1)
4444
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
45-
go listener.ListenToService(queueNames)
46-
<-sigChan
47-
listener.Close()
45+
go func() {
46+
<-sigChan
47+
logger.Infof("Received interruption signal. Shutting down gracefully...")
48+
listener.Close()
49+
os.Exit(0)
50+
}()
51+
52+
listener.ListenToService(queueNames)
4853

4954
return nil
5055
},

messages/listener/rmq.go

Lines changed: 138 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,165 +1,167 @@
11
package listener
22

33
import (
4-
"errors"
5-
"fmt"
6-
"github.com/streadway/amqp"
7-
"magecomm/config_manager/system_limits"
8-
"magecomm/logger"
9-
"magecomm/messages/handler"
10-
"magecomm/messages/queues"
11-
"magecomm/services"
12-
"sync"
4+
"errors"
5+
"fmt"
6+
"github.com/streadway/amqp"
7+
"magecomm/config_manager/system_limits"
8+
"magecomm/logger"
9+
"magecomm/messages/handler"
10+
"magecomm/messages/queues"
11+
"magecomm/services"
12+
"sync"
1313
)
1414

1515
type RmqListener struct {
16-
ChannelPool *services.RabbitMQChannelPool
17-
done chan struct{}
18-
wg sync.WaitGroup
16+
ChannelPool *services.RabbitMQChannelPool
17+
stopChan chan struct{}
18+
waitGroup sync.WaitGroup
1919
}
2020

2121
func (listener *RmqListener) shouldExecutionBeDelayed() error {
22-
totalDeferTime := 0
23-
for system_limits.CheckIfOutsideOperationalLimits() {
24-
system_limits.SystemLimitCheckSleep()
25-
totalDeferTime += int(system_limits.WaitTimeBetweenChecks)
22+
totalDeferTime := 0
23+
for system_limits.CheckIfOutsideOperationalLimits() {
24+
system_limits.SystemLimitCheckSleep()
25+
totalDeferTime += int(system_limits.WaitTimeBetweenChecks)
2626

27-
if totalDeferTime > int(system_limits.MaxDeferralTime) {
28-
return errors.New("max deferral time exceeded")
29-
}
30-
}
27+
if totalDeferTime > int(system_limits.MaxDeferralTime) {
28+
return errors.New("max deferral time exceeded")
29+
}
30+
}
3131

32-
return nil
32+
return nil
3333
}
3434

3535
func (listener *RmqListener) processRmqMessage(message amqp.Delivery, channel *amqp.Channel, queueName string) {
36-
logger.Debugf("Message received from %s", queueName)
37-
correlationID := message.CorrelationId
38-
if message.Headers == nil {
39-
message.Headers = make(amqp.Table)
40-
}
41-
42-
retryCount, ok := message.Headers["RetryCount"]
43-
if !ok {
44-
retryCount = int32(0)
45-
}
46-
47-
err := listener.shouldExecutionBeDelayed()
48-
if err != nil {
49-
logger.Warnf("Message deferral time exceeded. Dropping hold on the message.")
50-
message.Headers["RetryCount"] = retryCount.(int32) + 1
51-
_, err := services.PublishRmqMessage(channel, queueName, message.Body, message.Headers, correlationID)
52-
if err != nil {
53-
logger.Warnf("Failed to republish publish message: %v", err)
54-
}
55-
return
56-
}
57-
if err := handler.HandleReceivedMessage(string(message.Body), queueName, correlationID); err != nil {
58-
logger.Warnf("Failed to process message: %v", err)
59-
if retryCount.(int32) < handler.MessageRetryLimit {
60-
message.Headers["RetryCount"] = retryCount.(int32) + 1
61-
_, err := services.PublishRmqMessage(channel, queueName, message.Body, message.Headers, correlationID)
62-
if err != nil {
63-
logger.Warnf("Failed to republish publish message: %v", err)
64-
}
65-
} else {
66-
logger.Warnf("Retry count exceeded. Discarding the message.")
67-
}
68-
}
36+
logger.Debugf("Message received from %s", queueName)
37+
correlationID := message.CorrelationId
38+
if message.Headers == nil {
39+
message.Headers = make(amqp.Table)
40+
}
41+
42+
retryCount, ok := message.Headers["RetryCount"]
43+
if !ok {
44+
retryCount = int32(0)
45+
}
46+
47+
err := listener.shouldExecutionBeDelayed()
48+
if err != nil {
49+
logger.Warnf("Message deferral time exceeded. Dropping hold on the message.")
50+
message.Headers["RetryCount"] = retryCount.(int32) + 1
51+
_, err := services.PublishRmqMessage(channel, queueName, message.Body, message.Headers, correlationID)
52+
if err != nil {
53+
logger.Warnf("Failed to republish publish message: %v", err)
54+
}
55+
return
56+
}
57+
if err := handler.HandleReceivedMessage(string(message.Body), queueName, correlationID); err != nil {
58+
logger.Warnf("Failed to process message: %v", err)
59+
if retryCount.(int32) < handler.MessageRetryLimit {
60+
message.Headers["RetryCount"] = retryCount.(int32) + 1
61+
_, err := services.PublishRmqMessage(channel, queueName, message.Body, message.Headers, correlationID)
62+
if err != nil {
63+
logger.Warnf("Failed to republish publish message: %v", err)
64+
}
65+
} else {
66+
logger.Warnf("Retry count exceeded. Discarding the message.")
67+
}
68+
}
6969
}
7070

7171
func (listener *RmqListener) listenToQueue(queueName string) {
72-
defer listener.wg.Done()
73-
74-
channel, err := listener.ChannelPool.Get()
75-
if err != nil {
76-
logger.Warnf("Error getting channel from pool: %v", err)
77-
return
78-
}
79-
defer listener.ChannelPool.Put(channel)
80-
81-
queueNameWithConfigPrefix, err := services.CreateRmqQueue(channel, queueName)
82-
if err != nil {
83-
return
84-
}
85-
msgs, err := channel.Consume(
86-
queueNameWithConfigPrefix,
87-
"",
88-
true,
89-
false,
90-
false,
91-
false,
92-
nil,
93-
)
94-
if err != nil {
95-
logger.Fatalf("%s: %s", "Failed to register a consumer", err)
96-
}
97-
98-
for {
99-
select {
100-
case message, ok := <-msgs:
101-
if !ok {
102-
return
103-
}
104-
listener.processRmqMessage(message, channel, queueName)
105-
case <-listener.done:
106-
return
107-
}
108-
}
72+
defer listener.waitGroup.Done()
73+
74+
channel, err := listener.ChannelPool.Get()
75+
if err != nil {
76+
logger.Warnf("Error getting channel from pool: %v", err)
77+
return
78+
}
79+
defer listener.ChannelPool.Put(channel)
80+
81+
queueNameWithConfigPrefix, err := services.CreateRmqQueue(channel, queueName)
82+
if err != nil {
83+
return
84+
}
85+
msgs, err := channel.Consume(
86+
queueNameWithConfigPrefix,
87+
"",
88+
true,
89+
false,
90+
false,
91+
false,
92+
nil,
93+
)
94+
if err != nil {
95+
logger.Fatalf("%s: %s", "Failed to register a consumer", err)
96+
}
97+
98+
for {
99+
select {
100+
case message, ok := <-msgs:
101+
if !ok {
102+
return
103+
}
104+
listener.processRmqMessage(message, channel, queueName)
105+
case <-listener.stopChan:
106+
return
107+
}
108+
}
109109
}
110110

111111
func (listener *RmqListener) ListenForOutputByCorrelationID(queueName string, correlationID string) (string, error) {
112-
queueName = queues.MapQueueToOutputQueue(queueName)
113-
channel, err := listener.ChannelPool.Get()
114-
if err != nil {
115-
logger.Warnf("Error getting channel from pool: %v", err)
116-
return "", err
117-
}
118-
defer listener.ChannelPool.Put(channel)
119-
120-
queueNameWithConfigPrefix, err := services.CreateRmqQueue(channel, queueName)
121-
if err != nil {
122-
return "", err
123-
}
124-
msgs, err := channel.Consume(
125-
queueNameWithConfigPrefix,
126-
"",
127-
false,
128-
false,
129-
false,
130-
false,
131-
nil,
132-
)
133-
if err != nil {
134-
return "", fmt.Errorf("failed to consume messages: %s", err)
135-
}
136-
137-
for msg := range msgs {
138-
if correlationID == msg.CorrelationId {
139-
output := string(msg.Body)
140-
err = msg.Ack(false)
141-
if err != nil {
142-
return "", fmt.Errorf("failed to acknowledge message: %s", err)
143-
}
144-
145-
return output, nil
146-
}
147-
}
148-
149-
return "", fmt.Errorf("failed to receive message with correlation ID: %s", correlationID)
112+
queueName = queues.MapQueueToOutputQueue(queueName)
113+
channel, err := listener.ChannelPool.Get()
114+
if err != nil {
115+
logger.Warnf("Error getting channel from pool: %v", err)
116+
return "", err
117+
}
118+
defer listener.ChannelPool.Put(channel)
119+
120+
queueNameWithConfigPrefix, err := services.CreateRmqQueue(channel, queueName)
121+
if err != nil {
122+
return "", err
123+
}
124+
msgs, err := channel.Consume(
125+
queueNameWithConfigPrefix,
126+
"",
127+
false,
128+
false,
129+
false,
130+
false,
131+
nil,
132+
)
133+
if err != nil {
134+
return "", fmt.Errorf("failed to consume messages: %s", err)
135+
}
136+
137+
for msg := range msgs {
138+
if correlationID == msg.CorrelationId {
139+
output := string(msg.Body)
140+
err = msg.Ack(false)
141+
if err != nil {
142+
return "", fmt.Errorf("failed to acknowledge message: %s", err)
143+
}
144+
145+
return output, nil
146+
}
147+
}
148+
149+
return "", fmt.Errorf("failed to receive message with correlation ID: %s", correlationID)
150150
}
151151

152152
func (listener *RmqListener) ListenToService(queueNames []string) {
153-
listener.done = make(chan struct{})
153+
listener.stopChan = make(chan struct{})
154154

155-
for _, queueName := range queueNames {
156-
listener.wg.Add(1)
157-
go listener.listenToQueue(queueName)
158-
}
155+
for _, queueName := range queueNames {
156+
listener.waitGroup.Add(1)
157+
go listener.listenToQueue(queueName)
158+
}
159159

160-
listener.wg.Wait()
160+
listener.waitGroup.Wait()
161161
}
162162

163163
func (listener *RmqListener) Close() {
164-
close(listener.done)
164+
close(listener.stopChan)
165+
logger.Infof("Stopped listening to queues")
166+
fmt.Println("Stopped listening to queues")
165167
}

messages/listener/sqs.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,5 +200,6 @@ func (listener *SqsListener) ListenToService(queueNames []string) {
200200

201201
func (listener *SqsListener) Close() {
202202
close(listener.stopChan)
203-
listener.waitGroup.Wait()
203+
logger.Infof("Stopped listening to queues")
204+
fmt.Println("Stopped listening to queues")
204205
}

0 commit comments

Comments
 (0)