Skip to content

Commit 59ea15d

Browse files
committed
refactor: simplify clientMap locking, because it is not used in hot paths
feature: tracing and add default tracer test: add tests for valkey.Hook Signed-off-by: Sandor Szücs <[email protected]>
1 parent 9dc839a commit 59ea15d

File tree

2 files changed

+138
-19
lines changed

2 files changed

+138
-19
lines changed

net/valkey.go

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ func createValkeyClient(addr string, opt *ValkeyOptions) (valkey.Client, error)
8686
// BlockingPoolMinSize: 0,
8787
// BlockingPoolSize: 0,
8888
// BlockingPipeline: 0,
89+
8990
}
9091
var (
9192
cli valkey.Client
@@ -113,7 +114,7 @@ type valkeyRing struct {
113114
activeShards int
114115

115116
// clientMap is used for Ping operations and to simplify update shards
116-
mu sync.RWMutex
117+
mu sync.Mutex
117118
clientMap map[string]valkey.Client // map["10.5.1.43:6379"]valkey.Client
118119
}
119120

@@ -139,6 +140,7 @@ func newValkeyRing(opt *ValkeyOptions) (*valkeyRing, error) {
139140
return ring, nil
140141
}
141142

143+
// updateShards needs to be called with holding lock vr.mu
142144
func (vr *valkeyRing) updateShards(addr []string) {
143145
if len(addr) == 0 {
144146
return
@@ -169,13 +171,13 @@ func (vr *valkeyRing) SetAddr(addr []string) error {
169171
return nil
170172
}
171173

174+
vr.mu.Lock()
175+
defer vr.mu.Unlock()
172176
current := make([]string, 0, len(vr.clientMap))
173177

174-
vr.mu.RLock()
175178
for k := range vr.clientMap {
176179
current = append(current, k)
177180
}
178-
vr.mu.RUnlock()
179181

180182
// set operations
181183
intersection := intersect(addr, current)
@@ -191,13 +193,10 @@ func (vr *valkeyRing) SetAddr(addr []string) error {
191193
if err != nil {
192194
return fmt.Errorf("failed to create valkey client on SetAddr: %w", err)
193195
}
194-
vr.mu.Lock()
195196
vr.clientMap[addr] = cli
196-
vr.mu.Unlock()
197197
}
198198

199199
// close old clients and update current shards
200-
vr.mu.Lock()
201200
for _, addr := range oldAddr {
202201
cli, ok := vr.clientMap[addr]
203202
delete(vr.clientMap, addr)
@@ -211,7 +210,6 @@ func (vr *valkeyRing) SetAddr(addr []string) error {
211210
curAddr = append(curAddr, k)
212211
}
213212
vr.updateShards(curAddr)
214-
vr.mu.Unlock()
215213

216214
return nil
217215
}
@@ -224,21 +222,21 @@ func (vr *valkeyRing) shardForKey(key string) valkey.Client {
224222
// PingAll pings all known shards
225223
func (vr *valkeyRing) PingAll(ctx context.Context) map[string]valkey.ValkeyResult {
226224
res := make(map[string]valkey.ValkeyResult)
227-
vr.mu.RLock()
225+
vr.mu.Lock()
228226
for k, shard := range vr.clientMap {
229227
res[k] = shard.Do(ctx, shard.B().Ping().Build())
230228
}
231-
vr.mu.RUnlock()
229+
vr.mu.Unlock()
232230
return res
233231
}
234232

235233
// Ping pings given shard by address:port
236234
func (vr *valkeyRing) Ping(ctx context.Context, s string) error {
237-
vr.mu.RLock()
235+
vr.mu.Lock()
238236
shard, ok := vr.clientMap[s]
239-
vr.mu.RUnlock()
237+
vr.mu.Unlock()
240238
if !ok {
241-
return nil
239+
return fmt.Errorf("failed to find client for shard: %q", s)
242240
}
243241
res := shard.Do(ctx, shard.B().Ping().Build())
244242
return res.Error()
@@ -317,12 +315,23 @@ func NewValkeyRingClient(opt *ValkeyOptions) (*ValkeyRingClient, error) {
317315
const backOffTime = 2 * time.Second
318316
const retryCount = 5
319317

318+
// defaults
319+
if opt.Tracer == nil {
320+
opt.Tracer = &opentracing.NoopTracer{}
321+
}
320322
if opt.Log == nil {
321323
opt.Log = &logging.DefaultLog{}
322324
}
325+
if opt.Metrics == nil {
326+
opt.Metrics = metrics.Default
327+
}
323328

324329
// initially run address updater and pass opt.Addrs on success
325330
if opt.AddrUpdater != nil {
331+
if opt.UpdateInterval == 0 {
332+
opt.UpdateInterval = DefaultUpdateInterval
333+
}
334+
326335
address, err := opt.AddrUpdater()
327336
for range retryCount {
328337
if err == nil {
@@ -343,10 +352,6 @@ func NewValkeyRingClient(opt *ValkeyOptions) (*ValkeyRingClient, error) {
343352
return nil, err
344353
}
345354

346-
if opt.Metrics == nil {
347-
opt.Metrics = metrics.Default
348-
}
349-
350355
quitCH := make(chan struct{})
351356

352357
vrc := &ValkeyRingClient{
@@ -361,9 +366,6 @@ func NewValkeyRingClient(opt *ValkeyOptions) (*ValkeyRingClient, error) {
361366
}
362367

363368
if opt.AddrUpdater != nil {
364-
if opt.UpdateInterval == 0 {
365-
opt.UpdateInterval = DefaultUpdateInterval
366-
}
367369
go vrc.startUpdater(context.Background())
368370
}
369371

net/valkey_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ import (
88
"testing/synctest"
99
"time"
1010

11+
"github.com/valkey-io/valkey-go"
1112
"github.com/zalando/skipper/metrics/metricstest"
1213
"github.com/zalando/skipper/net/valkeytest"
1314
"github.com/zalando/skipper/tracing/tracers/basic"
15+
"github.com/zalando/skipper/tracing/tracingtest"
1416
)
1517

1618
func TestValkeyContainer(t *testing.T) {
@@ -86,6 +88,121 @@ func TestValkey_hasAll(t *testing.T) {
8688
})
8789
}
8890
}
91+
92+
type hook struct {
93+
beforeDo, beforeDoMulti, beforeDoCache, beforeDoMultiCache, beforeReceive, beforeDoStream, beforeDoMultiStream bool
94+
afterDo, afterDoMulti, afterDoCache, afterDoMultiCache, afterReceive, afterDoStream, afterDoMultiStream bool
95+
}
96+
97+
func (h *hook) Do(client valkey.Client, ctx context.Context, cmd valkey.Completed) (resp valkey.ValkeyResult) {
98+
h.beforeDo = true
99+
resp = client.Do(ctx, cmd)
100+
h.afterDo = true
101+
return
102+
}
103+
104+
func (h *hook) DoMulti(client valkey.Client, ctx context.Context, multi ...valkey.Completed) (resps []valkey.ValkeyResult) {
105+
h.beforeDoMulti = true
106+
resps = client.DoMulti(ctx, multi...)
107+
h.afterDoMulti = true
108+
return
109+
}
110+
111+
func (h *hook) DoCache(client valkey.Client, ctx context.Context, cmd valkey.Cacheable, ttl time.Duration) (resp valkey.ValkeyResult) {
112+
h.beforeDoCache = true
113+
resp = client.DoCache(ctx, cmd, ttl)
114+
h.afterDoCache = true
115+
return
116+
}
117+
118+
func (h *hook) DoMultiCache(client valkey.Client, ctx context.Context, multi ...valkey.CacheableTTL) (resps []valkey.ValkeyResult) {
119+
h.beforeDoMultiCache = true
120+
resps = client.DoMultiCache(ctx, multi...)
121+
h.afterDoMultiCache = true
122+
return
123+
}
124+
125+
func (h *hook) Receive(client valkey.Client, ctx context.Context, subscribe valkey.Completed, fn func(msg valkey.PubSubMessage)) (err error) {
126+
h.beforeReceive = true
127+
err = client.Receive(ctx, subscribe, fn)
128+
h.afterReceive = true
129+
return
130+
}
131+
132+
func (h *hook) DoStream(client valkey.Client, ctx context.Context, cmd valkey.Completed) valkey.ValkeyResultStream {
133+
h.beforeDoStream = true
134+
res := client.DoStream(ctx, cmd)
135+
h.afterDoStream = true
136+
return res
137+
}
138+
139+
func (h *hook) DoMultiStream(client valkey.Client, ctx context.Context, multi ...valkey.Completed) valkey.MultiValkeyResultStream {
140+
h.beforeDoMultiStream = true
141+
res := client.DoMultiStream(ctx, multi...)
142+
h.afterDoMultiStream = true
143+
return res
144+
145+
}
146+
147+
func TestValkeyHook(t *testing.T) {
148+
valkeyAddr, done := valkeytest.NewTestValkey(t)
149+
defer done()
150+
hooks := &hook{}
151+
cli, err := NewValkeyRingClient(&ValkeyOptions{
152+
Addrs: []string{valkeyAddr},
153+
Metrics: &metricstest.MockMetrics{},
154+
MetricsPrefix: "skipper.valkey.",
155+
Hook: hooks,
156+
})
157+
if err != nil {
158+
t.Fatalf("Failed to create valkey ring client: %v", err)
159+
}
160+
defer func() {
161+
t.Logf("closing ring client")
162+
cli.Close()
163+
}()
164+
165+
ctx := context.Background()
166+
cli.ring.Set(ctx, "k", "v")
167+
168+
if !hooks.beforeDo || !hooks.afterDo {
169+
t.Fatalf("Failed to execute hook before %v, after %v", hooks.beforeDo, hooks.afterDo)
170+
}
171+
}
172+
173+
func TestValkeyOpentracing(t *testing.T) {
174+
mockTracer := tracingtest.NewTracer()
175+
176+
valkeyAddr, done := valkeytest.NewTestValkey(t)
177+
defer done()
178+
cli, err := NewValkeyRingClient(&ValkeyOptions{
179+
Addrs: []string{valkeyAddr},
180+
Tracer: mockTracer,
181+
})
182+
if err != nil {
183+
t.Fatalf("Failed to create valkey ring client: %v", err)
184+
}
185+
defer func() {
186+
t.Logf("closing ring client")
187+
cli.Close()
188+
}()
189+
190+
spanName := "valkey-span"
191+
span := cli.StartSpan(spanName)
192+
ctx := context.Background()
193+
cli.ring.Set(ctx, "k", "v")
194+
span.Finish()
195+
196+
foundSpan := mockTracer.FindSpan(spanName)
197+
if foundSpan != nil {
198+
if foundSpan.OperationName != spanName {
199+
t.Fatalf("Did not find span: %s", spanName)
200+
} else {
201+
t.Logf("Found span %s", spanName)
202+
}
203+
}
204+
}
205+
89206
func TestValkeyScript(t *testing.T) {
90207
valkeyAddr, done := valkeytest.NewTestValkey(t)
91208
defer done()

0 commit comments

Comments
 (0)