forked from jrallison/go-workers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fetch_test.go
140 lines (98 loc) · 3.4 KB
/
fetch_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package workers
import (
"github.com/customerio/gospec"
. "github.com/customerio/gospec"
"github.com/garyburd/redigo/redis"
)
func buildFetch(queue string) Fetcher {
manager := newManager(queue, nil, 1)
fetch := manager.fetch
go fetch.Fetch()
return fetch
}
func FetchSpec(c gospec.Context) {
c.Specify("Config.Fetch", func() {
c.Specify("it returns an instance of fetch with queue", func() {
fetch := buildFetch("fetchQueue1")
c.Expect(fetch.Queue(), Equals, "queue:fetchQueue1")
fetch.Close()
})
})
c.Specify("Fetch", func() {
message, _ := NewMsg("{\"foo\":\"bar\"}")
c.Specify("it puts messages from the queues on the messages channel", func() {
fetch := buildFetch("fetchQueue2")
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lpush", "queue:fetchQueue2", message.ToJson())
fetch.Ready() <- true
message := <-fetch.Messages()
c.Expect(message, Equals, message)
len, _ := redis.Int(conn.Do("llen", "queue:fetchQueue2"))
c.Expect(len, Equals, 0)
fetch.Close()
})
c.Specify("places in progress messages on private queue", func() {
fetch := buildFetch("fetchQueue3")
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lpush", "queue:fetchQueue3", message.ToJson())
fetch.Ready() <- true
<-fetch.Messages()
len, _ := redis.Int(conn.Do("llen", "queue:fetchQueue3:1:inprogress"))
c.Expect(len, Equals, 1)
messages, _ := redis.Strings(conn.Do("lrange", "queue:fetchQueue3:1:inprogress", 0, -1))
c.Expect(messages[0], Equals, message.ToJson())
fetch.Close()
})
c.Specify("removes in progress message when acknowledged", func() {
fetch := buildFetch("fetchQueue4")
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lpush", "queue:fetchQueue4", message.ToJson())
fetch.Ready() <- true
<-fetch.Messages()
fetch.Acknowledge(message)
len, _ := redis.Int(conn.Do("llen", "queue:fetchQueue4:1:inprogress"))
c.Expect(len, Equals, 0)
fetch.Close()
})
c.Specify("removes in progress message when serialized differently", func() {
json := "{\"foo\":\"bar\",\"args\":[]}"
message, _ := NewMsg(json)
c.Expect(json, Not(Equals), message.ToJson())
fetch := buildFetch("fetchQueue5")
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lpush", "queue:fetchQueue5", json)
fetch.Ready() <- true
<-fetch.Messages()
fetch.Acknowledge(message)
len, _ := redis.Int(conn.Do("llen", "queue:fetchQueue5:1:inprogress"))
c.Expect(len, Equals, 0)
fetch.Close()
})
c.Specify("refires any messages left in progress from prior instance", func() {
message2, _ := NewMsg("{\"foo\":\"bar2\"}")
message3, _ := NewMsg("{\"foo\":\"bar3\"}")
conn := Config.Pool.Get()
defer conn.Close()
conn.Do("lpush", "queue:fetchQueue6:1:inprogress", message.ToJson())
conn.Do("lpush", "queue:fetchQueue6:1:inprogress", message2.ToJson())
conn.Do("lpush", "queue:fetchQueue6", message3.ToJson())
fetch := buildFetch("fetchQueue6")
fetch.Ready() <- true
c.Expect(<-fetch.Messages(), Equals, message2)
fetch.Ready() <- true
c.Expect(<-fetch.Messages(), Equals, message)
fetch.Ready() <- true
c.Expect(<-fetch.Messages(), Equals, message3)
fetch.Acknowledge(message)
fetch.Acknowledge(message2)
fetch.Acknowledge(message3)
len, _ := redis.Int(conn.Do("llen", "queue:fetchQueue6:1:inprogress"))
c.Expect(len, Equals, 0)
fetch.Close()
})
})
}