@@ -17,6 +17,8 @@ package e2e
17
17
import (
18
18
"context"
19
19
"strings"
20
+ "sync"
21
+ "sync/atomic"
20
22
"testing"
21
23
"time"
22
24
@@ -142,3 +144,97 @@ func waitForEndpointInLog(ctx context.Context, proxyProc *expect.ExpectProcess,
142
144
143
145
return err
144
146
}
147
+
148
+ func TestGRPCProxyWatchersAfterTokenExpiry (t * testing.T ) {
149
+ ctx , cancel := context .WithCancel (context .Background ())
150
+ defer cancel ()
151
+ cluster , err := e2e .NewEtcdProcessCluster (ctx , t ,
152
+ e2e .WithClusterSize (1 ),
153
+ e2e .WithAuthTokenOpts ("simple" ),
154
+ e2e .WithAuthTokenTTL (1 ),
155
+ )
156
+ require .NoError (t , err )
157
+ t .Cleanup (func () { require .NoError (t , cluster .Stop ()) })
158
+
159
+ cli := cluster .Etcdctl ()
160
+
161
+ createUsers (ctx , t , cli )
162
+
163
+ require .NoError (t , cli .AuthEnable (ctx ))
164
+
165
+ var (
166
+ node1ClientURL = cluster .Procs [0 ].Config ().ClientURL
167
+ proxyClientURL = "127.0.0.1:42379"
168
+ )
169
+
170
+ proxyProc , err := e2e .SpawnCmd ([]string {
171
+ e2e .BinPath .Etcd , "grpc-proxy" , "start" ,
172
+ "--advertise-client-url" , proxyClientURL ,
173
+ "--listen-addr" , proxyClientURL ,
174
+ "--endpoints" , node1ClientURL ,
175
+ }, nil )
176
+ require .NoError (t , err )
177
+ t .Cleanup (func () { require .NoError (t , proxyProc .Stop ()) })
178
+
179
+ var totalEventsCount int64
180
+
181
+ handler := func (events clientv3.WatchChan ) {
182
+ for {
183
+ select {
184
+ case ev , open := <- events :
185
+ if ! open {
186
+ return
187
+ }
188
+ if ev .Err () != nil {
189
+ t .Logf ("watch response error: %s" , ev .Err ())
190
+ continue
191
+ }
192
+ atomic .AddInt64 (& totalEventsCount , 1 )
193
+ case <- ctx .Done ():
194
+ return
195
+ }
196
+ }
197
+ }
198
+
199
+ withAuth := e2e .WithAuth ("root" , "rootPassword" )
200
+ withEndpoint := e2e .WithEndpoints ([]string {proxyClientURL })
201
+
202
+ events := cluster .Etcdctl (withAuth , withEndpoint ).Watch (ctx , "/test" , config.WatchOptions {Prefix : true , Revision : 1 })
203
+
204
+ wg := sync.WaitGroup {}
205
+
206
+ wg .Add (1 )
207
+ go func () {
208
+ defer wg .Done ()
209
+ handler (events )
210
+ }()
211
+
212
+ clusterCli := cluster .Etcdctl (withAuth )
213
+ require .NoError (t , clusterCli .Put (ctx , "/test/1" , "test" , config.PutOptions {}))
214
+ require .NoError (t , err )
215
+
216
+ time .Sleep (time .Second * 2 )
217
+
218
+ events2 := cluster .Etcdctl (withAuth , withEndpoint ).Watch (ctx , "/test" , config.WatchOptions {Prefix : true , Revision : 1 })
219
+
220
+ wg .Add (1 )
221
+ go func () {
222
+ defer wg .Done ()
223
+ handler (events2 )
224
+ }()
225
+
226
+ events3 := cluster .Etcdctl (withAuth , withEndpoint ).Watch (ctx , "/test" , config.WatchOptions {Prefix : true , Revision : 1 })
227
+
228
+ wg .Add (1 )
229
+ go func () {
230
+ defer wg .Done ()
231
+ handler (events3 )
232
+ }()
233
+
234
+ time .Sleep (time .Second )
235
+
236
+ cancel ()
237
+ wg .Wait ()
238
+
239
+ assert .Equal (t , int64 (3 ), atomic .LoadInt64 (& totalEventsCount ))
240
+ }
0 commit comments