@@ -7,14 +7,36 @@ import (
7
7
"github.com/golang-queue/queue/core"
8
8
)
9
9
10
- // defined in rabbitmq client package.
10
+ /*
11
+ Package rabbitmq provides configuration options and helper functions
12
+ for setting up and customizing RabbitMQ workers and queues in the golang-queue system.
13
+ This file defines the available exchange types, the options struct, and a set of functional
14
+ options for flexible configuration.
15
+ */
16
+
17
+ /*
18
+ Predefined RabbitMQ exchange types for use in configuration.
19
+ - ExchangeDirect: Direct exchange type.
20
+ - ExchangeFanout: Fanout exchange type.
21
+ - ExchangeTopic: Topic exchange type.
22
+ - ExchangeHeaders: Headers exchange type.
23
+ */
11
24
const (
12
25
ExchangeDirect = "direct"
13
26
ExchangeFanout = "fanout"
14
27
ExchangeTopic = "topic"
15
28
ExchangeHeaders = "headers"
16
29
)
17
30
31
+ /*
32
+ isVaildExchange checks if the provided exchange name is one of the supported types.
33
+
34
+ Parameters:
35
+ - name: The exchange type name to validate.
36
+
37
+ Returns:
38
+ - bool: true if the exchange type is valid, false otherwise.
39
+ */
18
40
func isVaildExchange (name string ) bool {
19
41
switch name {
20
42
case ExchangeDirect , ExchangeFanout , ExchangeTopic , ExchangeHeaders :
@@ -24,97 +46,193 @@ func isVaildExchange(name string) bool {
24
46
}
25
47
}
26
48
27
- // Option for queue system
49
+ /*
50
+ Option is a functional option type for configuring the options struct.
51
+ It allows for flexible and composable configuration of RabbitMQ workers and queues.
52
+ */
28
53
type Option func (* options )
29
54
30
- // AMQP 0-9-1 Model Explained
31
- // ref: https://www.rabbitmq.com/tutorials/amqp-concepts.html
55
+ /*
56
+ options struct holds all configuration parameters for a RabbitMQ worker or queue.
57
+
58
+ Fields:
59
+ - runFunc: The function to execute for each task.
60
+ - logger: Logger instance for logging.
61
+ - addr: AMQP server URI.
62
+ - queue: Name of the queue to use.
63
+ - tag: Consumer tag for identification.
64
+ - exchangeName: Name of the AMQP exchange.
65
+ - exchangeType: Type of the AMQP exchange (direct, fanout, topic, headers).
66
+ - autoAck: Whether to enable automatic message acknowledgment.
67
+ - routingKey: AMQP routing key for message delivery.
68
+ */
32
69
type options struct {
33
- runFunc func (context.Context , core.TaskMessage ) error
34
- logger queue.Logger
35
- addr string
36
- queue string
37
- tag string
38
- // Durable AMQP exchange name
39
- exchangeName string
40
- // Exchange Types: Direct, Fanout, Topic and Headers
41
- exchangeType string
70
+ runFunc func (context.Context , core.TaskMessage ) error
71
+ logger queue.Logger
72
+ addr string
73
+ queue string
74
+ tag string
75
+ exchangeName string // Durable AMQP exchange name
76
+ exchangeType string // Exchange Types: Direct, Fanout, Topic and Headers
42
77
autoAck bool
43
- // AMQP routing key
44
- routingKey string
78
+ routingKey string // AMQP routing key
45
79
}
46
80
47
- // WithAddr setup the URI
81
+ /*
82
+ WithAddr sets the AMQP server URI.
83
+
84
+ Parameters:
85
+ - addr: The AMQP URI to connect to.
86
+
87
+ Returns:
88
+ - Option: Functional option to set the address.
89
+ */
48
90
func WithAddr (addr string ) Option {
49
91
return func (w * options ) {
50
92
w .addr = addr
51
93
}
52
94
}
53
95
54
- // WithExchangeName setup the Exchange name
55
- // Exchanges are AMQP 0-9-1 entities where messages are sent to.
56
- // Exchanges take a message and route it into zero or more queues.
96
+ /*
97
+ WithExchangeName sets the name of the AMQP exchange.
98
+
99
+ Parameters:
100
+ - val: The exchange name.
101
+
102
+ Returns:
103
+ - Option: Functional option to set the exchange name.
104
+
105
+ Exchanges are AMQP 0-9-1 entities where messages are sent to.
106
+ Exchanges take a message and route it into zero or more queues.
107
+ */
57
108
func WithExchangeName (val string ) Option {
58
109
return func (w * options ) {
59
110
w .exchangeName = val
60
111
}
61
112
}
62
113
63
- // WithExchangeType setup the Exchange type
64
- // The routing algorithm used depends on the exchange type and rules called bindings.
65
- // AMQP 0-9-1 brokers provide four exchange types:
66
- // Direct exchange (Empty string) and amq.direct
67
- // Fanout exchange amq.fanout
68
- // Topic exchange amq.topic
69
- // Headers exchange amq.match (and amq.headers in RabbitMQ)
114
+ /*
115
+ WithExchangeType sets the type of the AMQP exchange.
116
+
117
+ Parameters:
118
+ - val: The exchange type (direct, fanout, topic, headers).
119
+
120
+ Returns:
121
+ - Option: Functional option to set the exchange type.
122
+
123
+ The routing algorithm used depends on the exchange type and rules called bindings.
124
+ AMQP 0-9-1 brokers provide four exchange types:
125
+ - Direct exchange (Empty string) and amq.direct
126
+ - Fanout exchange amq.fanout
127
+ - Topic exchange amq.topic
128
+ - Headers exchange amq.match (and amq.headers in RabbitMQ)
129
+ */
70
130
func WithExchangeType (val string ) Option {
71
131
return func (w * options ) {
72
132
w .exchangeType = val
73
133
}
74
134
}
75
135
76
- // WithRoutingKey setup AMQP routing key
136
+ /*
137
+ WithRoutingKey sets the AMQP routing key.
138
+
139
+ Parameters:
140
+ - val: The routing key.
141
+
142
+ Returns:
143
+ - Option: Functional option to set the routing key.
144
+ */
77
145
func WithRoutingKey (val string ) Option {
78
146
return func (w * options ) {
79
147
w .routingKey = val
80
148
}
81
149
}
82
150
83
- // WithAddr setup the tag
151
+ /*
152
+ WithTag sets the consumer tag for the worker.
153
+
154
+ Parameters:
155
+ - val: The consumer tag.
156
+
157
+ Returns:
158
+ - Option: Functional option to set the tag.
159
+ */
84
160
func WithTag (val string ) Option {
85
161
return func (w * options ) {
86
162
w .tag = val
87
163
}
88
164
}
89
165
90
- // WithAutoAck enable message auto-ack
166
+ /*
167
+ WithAutoAck enables or disables automatic message acknowledgment.
168
+
169
+ Parameters:
170
+ - val: true to enable auto-ack, false to disable.
171
+
172
+ Returns:
173
+ - Option: Functional option to set autoAck.
174
+ */
91
175
func WithAutoAck (val bool ) Option {
92
176
return func (w * options ) {
93
177
w .autoAck = val
94
178
}
95
179
}
96
180
97
- // WithQueue setup the queue name
181
+ /*
182
+ WithQueue sets the name of the queue to use.
183
+
184
+ Parameters:
185
+ - val: The queue name.
186
+
187
+ Returns:
188
+ - Option: Functional option to set the queue name.
189
+ */
98
190
func WithQueue (val string ) Option {
99
191
return func (w * options ) {
100
192
w .queue = val
101
193
}
102
194
}
103
195
104
- // WithRunFunc setup the run func of queue
196
+ /*
197
+ WithRunFunc sets the function to execute for each task.
198
+
199
+ Parameters:
200
+ - fn: The function to run for each task message.
201
+
202
+ Returns:
203
+ - Option: Functional option to set the run function.
204
+ */
105
205
func WithRunFunc (fn func (context.Context , core.TaskMessage ) error ) Option {
106
206
return func (w * options ) {
107
207
w .runFunc = fn
108
208
}
109
209
}
110
210
111
- // WithLogger set custom logger
211
+ /*
212
+ WithLogger sets a custom logger for the worker or queue.
213
+
214
+ Parameters:
215
+ - l: The logger instance.
216
+
217
+ Returns:
218
+ - Option: Functional option to set the logger.
219
+ */
112
220
func WithLogger (l queue.Logger ) Option {
113
221
return func (w * options ) {
114
222
w .logger = l
115
223
}
116
224
}
117
225
226
+ /*
227
+ newOptions creates a new options struct with default values,
228
+ then applies any provided functional options to override defaults.
229
+
230
+ Parameters:
231
+ - opts: Variadic list of Option functions to customize the configuration.
232
+
233
+ Returns:
234
+ - options: The fully configured options struct.
235
+ */
118
236
func newOptions (opts ... Option ) options {
119
237
defaultOpts := options {
120
238
addr : "amqp://guest:guest@localhost:5672/" ,
@@ -130,12 +248,12 @@ func newOptions(opts ...Option) options {
130
248
},
131
249
}
132
250
133
- // Loop through each option
251
+ // Apply each provided option to override defaults
134
252
for _ , opt := range opts {
135
- // Call the option giving the instantiated
136
253
opt (& defaultOpts )
137
254
}
138
255
256
+ // Validate the exchange type
139
257
if ! isVaildExchange (defaultOpts .exchangeType ) {
140
258
defaultOpts .logger .Fatal ("invaild exchange type: " , defaultOpts .exchangeType )
141
259
}
0 commit comments