Skip to content

Commit ae719a0

Browse files
committed
feat: Add Output command for reading missed outputs
1 parent b882473 commit ae719a0

File tree

8 files changed

+280
-30
lines changed

8 files changed

+280
-30
lines changed

cmd/outputs.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package cmd
2+
3+
import (
4+
"fmt"
5+
"magecomm/messages/reader"
6+
7+
"github.com/spf13/cobra"
8+
)
9+
10+
var OutputsCmd = &cobra.Command{
11+
Use: "outputs",
12+
Short: "Drain all pending command outputs from the queue",
13+
RunE: func(cmd *cobra.Command, args []string) error {
14+
readerInstance, err := reader.MapReaderToEngine()
15+
if err != nil {
16+
return err
17+
}
18+
19+
count, err := readerInstance.DrainOutputQueue("magerun")
20+
if err != nil {
21+
return err
22+
}
23+
24+
if count == 0 {
25+
fmt.Println("No outputs available")
26+
} else {
27+
fmt.Println("\nOutputs finished")
28+
}
29+
30+
return nil
31+
},
32+
}

main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func main() {
4949
RootCmd.AddCommand(cmd.ListenCmd)
5050
RootCmd.AddCommand(cmd.MagerunCmd)
5151
RootCmd.AddCommand(cmd.CatCmd)
52+
RootCmd.AddCommand(cmd.OutputsCmd)
5253

5354
RootCmd.PersistentFlags().String("config", "", "Path to config file")
5455
RootCmd.PersistentFlags().Bool("debug", false, "Enable debug mode")

messages/handler/magerun.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,14 @@ func (handler *MagerunHandler) ProcessMessage(messageBody string, correlationID
5252
}
5353
}
5454

55-
// Publish the output to the RMQ/SQS output queue
5655
publisherClass, err := publisher.MapPublisherToEngine()
5756
if err != nil {
5857
logger.Warnf("Error publishing message to RMQ/SQS queue: %s", err)
5958
}
59+
60+
outputWithCommand := fmt.Sprintf("Command: %s\n\n%s", messageBody, output)
6061
logger.Infof("Publishing output to queue: %s with correlation ID: %s", queues.MapQueueToOutputQueue(MageRunQueue), correlationID)
61-
_, err = publisherClass.Publish(output, queues.MapQueueToOutputQueue(MageRunQueue), correlationID)
62+
_, err = publisherClass.Publish(outputWithCommand, queues.MapQueueToOutputQueue(MageRunQueue), correlationID)
6263
if err != nil {
6364
return err
6465
}

messages/reader/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package reader
2+
3+
type MessageReader interface {
4+
DrainOutputQueue(queueName string) (int, error)
5+
}

messages/reader/mapper.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package reader
2+
3+
import (
4+
"fmt"
5+
"magecomm/config_manager"
6+
"magecomm/services"
7+
)
8+
9+
var Reader MessageReader
10+
11+
func MapReaderToEngine() (MessageReader, error) {
12+
engine := config_manager.GetEngine()
13+
14+
switch engine {
15+
case services.EngineSQS:
16+
return &SqsReader{}, nil
17+
case services.EngineRabbitMQ:
18+
return &RmqReader{}, nil
19+
default:
20+
return nil, fmt.Errorf("unknown engine: %s", engine)
21+
}
22+
}

messages/reader/rmq.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package reader
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"magecomm/logger"
8+
"magecomm/messages/queues"
9+
"magecomm/services"
10+
11+
"github.com/streadway/amqp"
12+
)
13+
14+
type RmqReader struct{}
15+
16+
func (r *RmqReader) DrainOutputQueue(queueName string) (int, error) {
17+
outputQueueName := queues.MapQueueToOutputQueue(queueName)
18+
19+
channel, err := services.RmqChannelPool.Get()
20+
if err != nil {
21+
return 0, fmt.Errorf("error getting RMQ channel: %w", err)
22+
}
23+
defer services.RmqChannelPool.Put(channel)
24+
25+
queueNameWithPrefix, err := services.CreateRmqQueue(channel, outputQueueName)
26+
if err != nil {
27+
return 0, fmt.Errorf("error creating queue: %w", err)
28+
}
29+
30+
count := 0
31+
for {
32+
msg, ok, err := channel.Get(queueNameWithPrefix, false)
33+
if err != nil {
34+
return count, fmt.Errorf("error receiving message: %w", err)
35+
}
36+
37+
if !ok {
38+
break
39+
}
40+
41+
count++
42+
r.displayMessage(count, &msg)
43+
44+
if err := msg.Ack(false); err != nil {
45+
logger.Warnf("Failed to ack message: %v", err)
46+
}
47+
}
48+
49+
return count, nil
50+
}
51+
52+
func (r *RmqReader) displayMessage(index int, msg *amqp.Delivery) {
53+
fmt.Printf("\n%s\n", strings.Repeat("=", 60))
54+
fmt.Printf("Output %d\n", index)
55+
fmt.Printf("Received: %s\n", msg.Timestamp.Format("2006-01-02 15:04:05"))
56+
fmt.Printf("%s\n", strings.Repeat("=", 60))
57+
fmt.Println(string(msg.Body))
58+
}

messages/reader/sqs.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package reader
2+
3+
import (
4+
"fmt"
5+
"strconv"
6+
"strings"
7+
"time"
8+
9+
"magecomm/logger"
10+
"magecomm/messages/queues"
11+
"magecomm/services"
12+
13+
"github.com/aws/aws-sdk-go/aws"
14+
"github.com/aws/aws-sdk-go/service/sqs"
15+
)
16+
17+
type SqsReader struct{}
18+
19+
func (r *SqsReader) DrainOutputQueue(queueName string) (int, error) {
20+
outputQueueName := queues.MapQueueToOutputQueue(queueName)
21+
22+
sqsConnection, err := services.GetSQSConnection()
23+
if err != nil {
24+
return 0, fmt.Errorf("unable to get SQS connection: %w", err)
25+
}
26+
defer services.ReleaseSQSConnection(sqsConnection)
27+
28+
if err := sqsConnection.Connect(); err != nil {
29+
return 0, fmt.Errorf("error connecting to SQS: %w", err)
30+
}
31+
32+
sqsClient := sqsConnection.Client
33+
queueURL, err := services.CreateSQSQueueIfNotExists(sqsClient, outputQueueName)
34+
if err != nil {
35+
return 0, fmt.Errorf("error getting queue URL: %w", err)
36+
}
37+
38+
count := 0
39+
for {
40+
result, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
41+
QueueUrl: aws.String(queueURL),
42+
MaxNumberOfMessages: aws.Int64(10),
43+
WaitTimeSeconds: aws.Int64(1),
44+
AttributeNames: []*string{aws.String("SentTimestamp")},
45+
MessageAttributeNames: []*string{aws.String("All")},
46+
})
47+
48+
if err != nil {
49+
return count, fmt.Errorf("error receiving messages: %w", err)
50+
}
51+
52+
if len(result.Messages) == 0 {
53+
break
54+
}
55+
56+
for _, msg := range result.Messages {
57+
count++
58+
r.displayMessage(count, msg)
59+
60+
_, err := sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
61+
QueueUrl: aws.String(queueURL),
62+
ReceiptHandle: msg.ReceiptHandle,
63+
})
64+
if err != nil {
65+
logger.Warnf("Failed to delete message: %v", err)
66+
}
67+
}
68+
}
69+
70+
return count, nil
71+
}
72+
73+
func (r *SqsReader) displayMessage(index int, msg *sqs.Message) {
74+
fmt.Printf("\n%s\n", strings.Repeat("=", 60))
75+
fmt.Printf("Output %d\n", index)
76+
77+
if msg.Attributes["SentTimestamp"] != nil {
78+
timestamp := *msg.Attributes["SentTimestamp"]
79+
if ms, err := strconv.ParseInt(timestamp, 10, 64); err == nil {
80+
t := time.Unix(ms/1000, 0)
81+
fmt.Printf("Received: %s\n", t.Format("2006-01-02 15:04:05"))
82+
}
83+
}
84+
85+
fmt.Printf("%s\n", strings.Repeat("=", 60))
86+
fmt.Println(*msg.Body)
87+
}

services/rabbitmq_pool.go

Lines changed: 72 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,13 @@ package services
22

33
import (
44
"errors"
5-
"github.com/streadway/amqp"
5+
"fmt"
6+
67
"magecomm/config_manager"
8+
"magecomm/logger"
79
"sync"
10+
11+
"github.com/streadway/amqp"
812
)
913

1014
var (
@@ -20,7 +24,8 @@ type RabbitMQConnectionPool struct {
2024
}
2125

2226
type RabbitMQChannelPool struct {
23-
pool *sync.Pool
27+
pool *sync.Pool
28+
connPool *RabbitMQConnectionPool
2429
}
2530

2631
func Close() {
@@ -32,18 +37,19 @@ func Close() {
3237
func NewRabbitMQConnectionPool(initialSize int) *RabbitMQConnectionPool {
3338
p := &sync.Pool{
3439
New: func() interface{} {
35-
rmqConn := NewRabbitMQConnection()
36-
if err := rmqConn.Connect(""); err != nil {
37-
return err
38-
}
39-
return rmqConn
40+
return NewRabbitMQConnection()
4041
},
4142
}
4243

4344
cp := &RabbitMQConnectionPool{pool: p}
4445

4546
for i := 0; i < initialSize; i++ {
46-
p.Put(p.New())
47+
conn := NewRabbitMQConnection()
48+
if err := conn.Connect(""); err != nil {
49+
logger.Warnf("Failed to create initial RMQ connection: %v", err)
50+
continue
51+
}
52+
p.Put(conn)
4753
}
4854

4955
return cp
@@ -52,54 +58,92 @@ func NewRabbitMQConnectionPool(initialSize int) *RabbitMQConnectionPool {
5258
func NewRabbitMQChannelPool(connPool *RabbitMQConnectionPool, initialSize int) *RabbitMQChannelPool {
5359
p := &sync.Pool{
5460
New: func() interface{} {
55-
conn, err := connPool.Get()
56-
if err != nil {
57-
return err
58-
}
59-
channel, err := conn.CreateChannel()
60-
if err != nil {
61-
return err
62-
}
63-
return channel
61+
return nil
6462
},
6563
}
6664

67-
cp := &RabbitMQChannelPool{pool: p}
65+
cp := &RabbitMQChannelPool{
66+
pool: p,
67+
connPool: connPool,
68+
}
6869

6970
for i := 0; i < initialSize; i++ {
70-
p.Put(p.New())
71+
conn, err := connPool.Get()
72+
if err != nil {
73+
logger.Warnf("Failed to get connection for initial channel: %v", err)
74+
continue
75+
}
76+
channel, err := conn.CreateChannel()
77+
if err != nil {
78+
logger.Warnf("Failed to create initial RMQ channel: %v", err)
79+
connPool.Put(conn)
80+
continue
81+
}
82+
connPool.Put(conn)
83+
p.Put(channel)
7184
}
7285

7386
return cp
7487
}
7588

7689
func (cp *RabbitMQConnectionPool) Get() (*RabbitMQConnection, error) {
77-
conn := cp.pool.Get()
78-
if conn == nil {
90+
obj := cp.pool.Get()
91+
if obj == nil {
7992
return nil, ErrConnectionPoolClosed
8093
}
81-
return conn.(*RabbitMQConnection), nil
94+
95+
conn, ok := obj.(*RabbitMQConnection)
96+
if !ok || conn == nil {
97+
conn = NewRabbitMQConnection()
98+
}
99+
100+
if conn.Connection == nil || conn.Connection.IsClosed() {
101+
if err := conn.Connect(""); err != nil {
102+
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
103+
}
104+
}
105+
106+
return conn, nil
82107
}
83108

84109
func (cp *RabbitMQConnectionPool) Put(conn *RabbitMQConnection) {
85-
cp.pool.Put(conn)
110+
if conn != nil && conn.Connection != nil && !conn.Connection.IsClosed() {
111+
cp.pool.Put(conn)
112+
}
86113
}
87114

88115
func (cp *RabbitMQConnectionPool) Close() {
89116
cp.pool = nil
90117
}
91118

92119
func (cp *RabbitMQChannelPool) Get() (*amqp.Channel, error) {
120+
obj := cp.pool.Get()
93121

94-
channel := cp.pool.Get()
95-
if channel == nil {
96-
return nil, ErrChannelPoolClosed
122+
if obj != nil {
123+
if channel, ok := obj.(*amqp.Channel); ok && channel != nil {
124+
return channel, nil
125+
}
97126
}
98-
return channel.(*amqp.Channel), nil
127+
128+
conn, err := cp.connPool.Get()
129+
if err != nil {
130+
return nil, fmt.Errorf("failed to get connection: %w", err)
131+
}
132+
133+
channel, err := conn.CreateChannel()
134+
if err != nil {
135+
cp.connPool.Put(conn)
136+
return nil, fmt.Errorf("failed to create channel: %w", err)
137+
}
138+
139+
cp.connPool.Put(conn)
140+
return channel, nil
99141
}
100142

101143
func (cp *RabbitMQChannelPool) Put(channel *amqp.Channel) {
102-
cp.pool.Put(channel)
144+
if channel != nil {
145+
cp.pool.Put(channel)
146+
}
103147
}
104148

105149
func (cp *RabbitMQChannelPool) Close() {

0 commit comments

Comments
 (0)