forked from vcabbage/amqp
-
Notifications
You must be signed in to change notification settings - Fork 59
/
Copy pathcreditor_test.go
139 lines (105 loc) · 3.3 KB
/
creditor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package amqp
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/Azure/go-amqp/internal/encoding"
"github.com/stretchr/testify/require"
)
func TestCreditorIssueCredits(t *testing.T) {
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
drain, credits := r.creditor.FlowBits(1)
require.False(t, drain)
require.EqualValues(t, 3+1, credits, "flow frame includes the pending credits and our current credits")
// flow clears the previous data once it's been called.
drain, credits = r.creditor.FlowBits(4)
require.False(t, drain)
require.EqualValues(t, 0, credits, "drain flow frame always sets link-credit to 0")
}
func TestCreditorDrain(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
var err1, err2 error
go func() {
defer drainRoutines.Done()
err1 = r.creditor.Drain(ctx, r)
}()
go func() {
defer drainRoutines.Done()
err2 = r.creditor.Drain(ctx, r)
}()
// one of the drain calls will have succeeded, the other one should still be blocking.
time.Sleep(time.Second * 2)
// the next time someone requests a flow frame it'll drain (this doesn't affect the blocked Drain() calls)
drain, credits := r.creditor.FlowBits(101)
require.True(t, drain)
require.EqualValues(t, 0, credits, "Drain always drains with 0 credit")
// unblock the last of the drainers
r.creditor.EndDrain()
require.Nil(t, r.creditor.drained, "drain completes and removes the drained channel")
// wait for all the drain routines to end
drainRoutines.Wait()
// one of them should have failed (if both succeeded we've somehow let them both run)
require.False(t, err1 == nil && err2 == nil)
if err1 == nil {
require.Error(t, err2, errAlreadyDraining.Error())
} else {
require.Error(t, err1, errAlreadyDraining.Error())
}
}
func TestCreditorIssueCreditsWhileDrainingFails(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
r := newTestLink(t)
require.NoError(t, r.creditor.IssueCredit(3))
// only one drain allowed at a time.
drainRoutines := sync.WaitGroup{}
drainRoutines.Add(2)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := r.creditor.Drain(ctx, newTestLink(t))
require.NoError(t, err)
}()
time.Sleep(time.Second * 2)
// drain is still active, so...
require.Error(t, r.creditor.IssueCredit(1), errLinkDraining.Error())
r.creditor.EndDrain()
wg.Wait()
}
func TestCreditorDrainRespectsContext(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
mc := creditor{}
cancel()
require.Error(t, mc.Drain(ctx, newTestLink(t)), context.Canceled.Error())
}
func TestCreditorDrainReturnsProperError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()
errs := []*Error{
&encoding.Error{
Condition: ErrCondDecodeError,
},
nil,
}
for i, err := range errs {
t.Run(fmt.Sprintf("Error[%d]", i), func(t *testing.T) {
mc := creditor{}
link := newTestLink(t)
link.l.doneErr = err
close(link.l.done)
detachErr := mc.Drain(ctx, link)
require.Equal(t, err, detachErr)
})
}
}