@@ -4,14 +4,12 @@ import (
4
4
"context"
5
5
"crypto/tls"
6
6
"crypto/x509"
7
- "errors"
8
7
"fmt"
9
8
"log"
10
9
"os"
11
10
"time"
12
11
13
12
"github.com/cybertec-postgresql/vip-manager/vipconfig"
14
- rpcv3 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
15
13
clientv3 "go.etcd.io/etcd/client/v3"
16
14
)
17
15
@@ -80,7 +78,7 @@ func (elc *EtcdLeaderChecker) get(ctx context.Context, out chan<- bool) {
80
78
return
81
79
}
82
80
for _ , kv := range resp .Kvs {
83
- log .Printf ("Current Leader from DCS: %s" , kv .Value )
81
+ log .Printf ("current leader from DCS: %s" , kv .Value )
84
82
out <- string (kv .Value ) == elc .Nodename
85
83
}
86
84
}
@@ -94,18 +92,18 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
94
92
case <- ctx .Done ():
95
93
return ctx .Err ()
96
94
case watchResp := <- watchChan :
95
+ if watchResp .Canceled {
96
+ watchChan = elc .Watch (ctx , elc .Key )
97
+ log .Println ("reset cancelled WATCH on " + elc .Key )
98
+ continue
99
+ }
97
100
if err := watchResp .Err (); err != nil {
98
- if errors .Is (err , rpcv3 .ErrCompacted ) { // revision is compacted, try direct get key
99
- elc .get (ctx , out )
100
- } else {
101
- log .Printf ("etcd watcher returned error: %s" , err )
102
- out <- false
103
- }
101
+ elc .get (ctx , out ) // RPC failed, try to get the key directly to be on the safe side
104
102
continue
105
103
}
106
104
for _ , event := range watchResp .Events {
107
105
out <- string (event .Kv .Value ) == elc .Nodename
108
- log .Printf ("Current Leader from DCS: %s" , event .Kv .Value )
106
+ log .Printf ("current leader from DCS: %s" , event .Kv .Value )
109
107
}
110
108
}
111
109
}
0 commit comments