forked from vcabbage/amqp
-
Notifications
You must be signed in to change notification settings - Fork 59
/
Copy pathlink.go
403 lines (347 loc) · 13.1 KB
/
link.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
package amqp
import (
"context"
"errors"
"fmt"
"sync"
"github.com/Azure/go-amqp/internal/debug"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/Azure/go-amqp/internal/frames"
"github.com/Azure/go-amqp/internal/queue"
"github.com/Azure/go-amqp/internal/shared"
)
// linkKey uniquely identifies a link on a connection by name and direction.
//
// A link can be identified uniquely by the ordered tuple
//
// (source-container-id, target-container-id, name)
//
// On a single connection the container ID pairs can be abbreviated
// to a boolean flag indicating the direction of the link.
type linkKey struct {
name string
role encoding.Role // Local role: sender/receiver
}
// link contains the common state and methods for sending and receiving links
type link struct {
key linkKey // Name and direction
// NOTE: outputHandle and inputHandle might not have the same value
// our handle
outputHandle uint32
// remote's handle
inputHandle uint32
// frames destined for this link are added to this queue by Session.muxFrameToLink
rxQ *queue.Holder[frames.FrameBody]
// used for gracefully closing link
close chan struct{} // signals a link's mux to shut down; DO NOT use this to check if a link has terminated, use done instead
closeOnce *sync.Once // closeOnce protects close from being closed multiple times
done chan struct{} // closed when the link has terminated (mux exited); DO NOT wait on this from within a link's mux() as it will never trigger!
doneErr error // contains the mux error state; ONLY written to by the mux and MUST only be read from after done is closed!
closeErr error // contains the error state returned from closeLink(); ONLY closeLink() reads/writes this!
session *Session // parent session
source *frames.Source // used for Receiver links
target *frames.Target // used for Sender links
properties map[encoding.Symbol]any // additional properties sent upon link attach
// "The delivery-count is initialized by the sender when a link endpoint is created,
// and is incremented whenever a message is sent. Only the sender MAY independently
// modify this field. The receiver's value is calculated based on the last known
// value from the sender and any subsequent messages received on the link. Note that,
// despite its name, the delivery-count is not a count but a sequence number
// initialized at an arbitrary point by the sender."
deliveryCount uint32
// The current maximum number of messages that can be handled at the receiver endpoint of the link. Only the receiver endpoint
// can independently set this value. The sender endpoint sets this to the last known value seen from the receiver.
linkCredit uint32
// properties returned by the peer
peerProperties map[string]any
senderSettleMode *SenderSettleMode
receiverSettleMode *ReceiverSettleMode
maxMessageSize uint64
closeInProgress bool // indicates that the detach performative has been sent
dynamicAddr bool // request a dynamic link address from the server
}
func newLink(s *Session, r encoding.Role) link {
l := link{
key: linkKey{shared.RandString(40), r},
session: s,
close: make(chan struct{}),
closeOnce: &sync.Once{},
done: make(chan struct{}),
}
// set the segment size relative to respective window
var segmentSize int
if r == encoding.RoleReceiver {
segmentSize = int(s.incomingWindow)
} else {
segmentSize = int(s.outgoingWindow)
}
l.rxQ = queue.NewHolder(queue.New[frames.FrameBody](segmentSize))
return l
}
// waitForFrame waits for an incoming frame to be queued.
// it returns the next frame from the queue, or an error.
// the error is either from the context or session.doneErr.
// not meant for consumption outside of link.go.
func (l *link) waitForFrame(ctx context.Context) (frames.FrameBody, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.session.done:
// session has terminated, no need to deallocate in this case
return nil, l.session.doneErr
case q := <-l.rxQ.Wait():
// frame received
fr := q.Dequeue()
l.rxQ.Release(q)
return *fr, nil
}
}
// attach sends the Attach performative to establish the link with its parent session.
// this is automatically called by the new*Link constructors.
func (l *link) attach(ctx context.Context, beforeAttach func(*frames.PerformAttach), afterAttach func(*frames.PerformAttach)) error {
if err := l.session.freeAbandonedLinks(ctx); err != nil {
return err
}
// once the abandoned links have been cleaned up we can create our link
if err := l.session.allocateHandle(ctx, l); err != nil {
return err
}
attach := &frames.PerformAttach{
Name: l.key.name,
Handle: l.outputHandle,
ReceiverSettleMode: l.receiverSettleMode,
SenderSettleMode: l.senderSettleMode,
MaxMessageSize: l.maxMessageSize,
Source: l.source,
Target: l.target,
Properties: l.properties,
}
// link-specific configuration of the attach frame
beforeAttach(attach)
if err := l.txFrameAndWait(ctx, attach); err != nil {
return err
}
// wait for response
fr, err := l.waitForFrame(ctx)
if err != nil {
l.session.abandonLink(l)
return err
}
resp, ok := fr.(*frames.PerformAttach)
if !ok {
debug.Log(1, "RX (link %p): unexpected attach response frame %T", l, fr)
if err := l.session.conn.Close(); err != nil {
return err
}
return &ConnError{inner: fmt.Errorf("unexpected attach response: %#v", fr)}
}
// If the remote encounters an error during the attach it returns an Attach
// with no Source or Target. The remote then sends a Detach with an error.
//
// Note that if the application chooses not to create a terminus, the session
// endpoint will still create a link endpoint and issue an attach indicating
// that the link endpoint has no associated local terminus. In this case, the
// session endpoint MUST immediately detach the newly created link endpoint.
//
// http://docs.oasis-open.org/amqp/core/v1.0/csprd01/amqp-core-transport-v1.0-csprd01.html#doc-idp386144
if resp.Source == nil && resp.Target == nil {
// wait for detach
fr, err := l.waitForFrame(ctx)
if err != nil {
// we timed out waiting for the peer to close the link, this really isn't an abandoned link.
// however, we still need to send the detach performative to ack the peer.
l.session.abandonLink(l)
return err
}
detach, ok := fr.(*frames.PerformDetach)
if !ok {
if err := l.session.conn.Close(); err != nil {
return err
}
return &ConnError{inner: fmt.Errorf("unexpected frame while waiting for detach: %#v", fr)}
}
// send return detach
fr = &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, fr); err != nil {
return err
}
if detach.Error == nil {
return fmt.Errorf("received detach with no error specified")
}
return detach.Error
}
if l.maxMessageSize == 0 || resp.MaxMessageSize < l.maxMessageSize {
l.maxMessageSize = resp.MaxMessageSize
}
// link-specific configuration post attach
afterAttach(resp)
if err := l.setSettleModes(resp); err != nil {
// close the link as there's a mismatch on requested/supported settlement modes
dr := &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
if err := l.txFrameAndWait(ctx, dr); err != nil {
return err
}
return err
}
if len(resp.Properties) > 0 {
l.peerProperties = map[string]any{}
for k, v := range resp.Properties {
l.peerProperties[string(k)] = v
}
}
return nil
}
// setSettleModes sets the settlement modes based on the resp frames.PerformAttach.
//
// If a settlement mode has been explicitly set locally and it was not honored by the
// server an error is returned.
func (l *link) setSettleModes(resp *frames.PerformAttach) error {
var (
localRecvSettle = receiverSettleModeValue(l.receiverSettleMode)
respRecvSettle = receiverSettleModeValue(resp.ReceiverSettleMode)
)
if l.receiverSettleMode != nil && localRecvSettle != respRecvSettle {
return fmt.Errorf("amqp: receiver settlement mode %q requested, received %q from server", l.receiverSettleMode, &respRecvSettle)
}
l.receiverSettleMode = &respRecvSettle
var (
localSendSettle = senderSettleModeValue(l.senderSettleMode)
respSendSettle = senderSettleModeValue(resp.SenderSettleMode)
)
if l.senderSettleMode != nil && localSendSettle != respSendSettle {
return fmt.Errorf("amqp: sender settlement mode %q requested, received %q from server", l.senderSettleMode, &respSendSettle)
}
l.senderSettleMode = &respSendSettle
return nil
}
// muxHandleFrame processes fr based on type.
func (l *link) muxHandleFrame(fr frames.FrameBody) error {
switch fr := fr.(type) {
case *frames.PerformDetach:
if !fr.Closed {
l.closeWithError(ErrCondNotImplemented, fmt.Sprintf("non-closing detach not supported: %+v", fr))
return nil
}
// there are two possibilities:
// - this is the ack to a client-side Close()
// - the peer is closing the link so we must ack
if l.closeInProgress {
// if the client-side close was initiated due to an error (l.closeWithError)
// then l.doneErr will already be set. in this case, return that error instead
// of an empty LinkError which indicates a clean client-side close.
if l.doneErr != nil {
return l.doneErr
}
return &LinkError{}
}
dr := &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
}
l.txFrame(&frameContext{Ctx: context.Background()}, dr)
return &LinkError{RemoteErr: fr.Error}
default:
debug.Log(1, "RX (link %p): unexpected frame: %s", l, fr)
l.closeWithError(ErrCondInternalError, fmt.Sprintf("link received unexpected frame %T", fr))
return nil
}
}
// Close closes the Sender and AMQP link.
func (l *link) closeLink(ctx context.Context) error {
var ctxErr error
l.closeOnce.Do(func() {
close(l.close)
// once the mux has received the ack'ing detach performative, the mux will
// exit which deletes the link and closes l.done.
select {
case <-l.done:
l.closeErr = l.doneErr
case <-ctx.Done():
// notify the caller that the close timed out/was cancelled.
// the mux will remain running and once the ack is received it will terminate.
ctxErr = ctx.Err()
// record that the close timed out/was cancelled.
// subsequent calls to closeLink() will return this
debug.Log(1, "TX (link %p) closing %s: %v", l, l.key.name, ctxErr)
l.closeErr = &LinkError{inner: ctxErr}
}
})
if ctxErr != nil {
return ctxErr
}
var linkErr *LinkError
if errors.As(l.closeErr, &linkErr) && linkErr.RemoteErr == nil && linkErr.inner == nil {
// an empty LinkError means the link was cleanly closed by the caller
return nil
}
return l.closeErr
}
// closeWithError initiates closing the link with the specified AMQP error.
// the mux must continue to run until the ack'ing detach is received.
// l.doneErr is populated with a &LinkError{} containing an inner error constructed from the specified values
// - cnd is the AMQP error condition
// - desc is the error description
func (l *link) closeWithError(cnd ErrCond, desc string) {
amqpErr := &Error{Condition: cnd, Description: desc}
if l.closeInProgress {
debug.Log(3, "TX (link %p) close error already pending, discarding %v", l, amqpErr)
return
}
dr := &frames.PerformDetach{
Handle: l.outputHandle,
Closed: true,
Error: amqpErr,
}
l.closeInProgress = true
l.doneErr = &LinkError{inner: fmt.Errorf("%s: %s", cnd, desc)}
l.txFrame(&frameContext{Ctx: context.Background()}, dr)
}
// txFrame sends the specified frame via the link's session.
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrame(frameCtx *frameContext, fr frames.FrameBody) {
// NOTE: there is no need to select on l.done as this is either
// called from a link's mux or before the mux has even started.
select {
case <-l.session.done:
// the link's session has terminated, let that propagate to the link's mux
case <-l.session.endSent:
// we swallow this to prevent the link's mux from terminating.
// l.session.done will soon close so this is temporary.
case l.session.tx <- frameBodyEnvelope{FrameCtx: frameCtx, FrameBody: fr}:
debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
}
}
// txFrame sends the specified frame via the link's session.
// you MUST call this instead of session.txFrame() to ensure
// that frames are not sent during session shutdown.
func (l *link) txFrameAndWait(ctx context.Context, fr frames.FrameBody) error {
frameCtx := frameContext{
Ctx: ctx,
Done: make(chan struct{}),
}
// NOTE: there is no need to select on l.done as this is either
// called from a link's mux or before the mux has even started.
select {
case <-l.session.done:
return l.session.doneErr
case <-l.session.endSent:
// we swallow this to prevent the link's mux from terminating.
// l.session.done will soon close so this is temporary.
return nil
case l.session.tx <- frameBodyEnvelope{FrameCtx: &frameCtx, FrameBody: fr}:
debug.Log(2, "TX (link %p): mux frame to Session (%p): %s", l, l.session, fr)
}
select {
case <-frameCtx.Done:
return frameCtx.Err
case <-l.session.done:
return l.session.doneErr
}
}