This repository has been archived by the owner on Apr 19, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathunsubscribe.go
151 lines (140 loc) · 3.33 KB
/
unsubscribe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//
// Copyright © 2011-2019 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package stompws
import (
"strconv"
"time"
)
/*
Unsubscribe from a STOMP subscription.
Headers MUST contain a "destination" header key, and for Stomp 1.1+,
a "id" header key per the specifications. The subscription MUST currently
exist for this session.
Example:
// Possible additional Header keys: "id".
h := stompngo.Headers{stompngo.HK_DESTINATION, "/queue/myqueue"}
e := c.Unsubscribe(h)
if e != nil {
// Do something sane ...
}
*/
func (c *Connection) Unsubscribe(h Headers) error {
c.log(UNSUBSCRIBE, "start", h)
// fmt.Printf("Unsub Headers: %v\n", h)
if !c.isConnected() {
return ECONBAD
}
e := checkHeaders(h, c.Protocol())
if e != nil {
return e
}
// Specification Requirements:
// 1.0) requires either a destination header or an id header
// 1.1) ... requires ... the id header ....
// 1.2) an id header MUST be included in the frame
//
_, okd := h.Contains(HK_DESTINATION)
shid, oki := h.Contains(HK_ID)
switch c.Protocol() {
case SPL_12:
if !oki {
return EUNOSID
}
case SPL_11:
if !oki {
return EUNOSID
}
case SPL_10:
if !oki && !okd {
return EUNODSID
}
default:
panic("unsubscribe version not supported: " + c.Protocol())
}
//
shaid := Sha1(h.Value(HK_DESTINATION)) // Special for 1.0
c.subsLock.RLock()
s1x, p := c.subs[shid]
s10, ps := c.subs[shaid] // Special for 1.0
c.subsLock.RUnlock()
var usesp *subscription
usekey := ""
switch c.Protocol() {
case SPL_12:
fallthrough
case SPL_11:
if !oki {
return EUNOSID // id required
}
if !p { // subscription does not exist
return EBADSID // invalid subscription-id
}
usekey = shid
usesp = s1x
case SPL_10:
if !p && !ps {
return EUNODSID
}
usekey = shaid
usesp = s10
default:
panic("unsubscribe version not supported: " + c.Protocol())
}
sdn, ok := h.Contains(StompPlusDrainNow) // STOMP Protocol Extension
if !ok {
e = c.transmitCommon(UNSUBSCRIBE, h) // transmitCommon Clones() the headers
if e != nil {
return e
}
c.subsLock.Lock()
delete(c.subs, usekey)
c.subsLock.Unlock()
c.log(UNSUBSCRIBE, "end", h)
return nil
}
//
// STOMP Protocol Extension
//
c.log("sngdrnow extension detected")
idn, err := strconv.ParseInt(sdn, 10, 64)
if err != nil {
idn = 100 // 100 milliseconds if bad parameter
}
ival := time.Duration(idn * 1000000)
dmc := 0
forsel:
for {
ticker := time.NewTicker(ival)
select {
case mi, ok := <-usesp.md:
if !ok {
break forsel
}
dmc++
c.log("sngdrnow DROP", dmc, mi.Message.Command, mi.Message.Headers)
case _ = <-ticker.C:
c.log("sngdrnow extension BREAK")
break forsel
}
}
//
c.log("sngdrnow extension at very end")
c.subsLock.Lock()
delete(c.subs, usekey)
c.subsLock.Unlock()
c.log(UNSUBSCRIBE, "endsngdrnow", h)
return nil
}