Skip to content

Commit 6fa7342

Browse files
authored
Merge pull request #17563 from serathius/sync-reuse
Reuse events used for syncing watchers
2 parents afb5c1d + 348c0cb commit 6fa7342

File tree

3 files changed

+166
-27
lines changed

3 files changed

+166
-27
lines changed

server/storage/mvcc/watchable_store.go

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ func (s *watchableStore) syncWatchersLoop() {
221221
waitDuration := 100 * time.Millisecond
222222
delayTicker := time.NewTicker(waitDuration)
223223
defer delayTicker.Stop()
224+
var evs []mvccpb.Event
224225

225226
for {
226227
s.mu.RLock()
@@ -230,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() {
230231

231232
unsyncedWatchers := 0
232233
if lastUnsyncedWatchers > 0 {
233-
unsyncedWatchers = s.syncWatchers()
234+
unsyncedWatchers, evs = s.syncWatchers(evs)
234235
}
235236
syncDuration := time.Since(st)
236237

@@ -339,12 +340,12 @@ func (s *watchableStore) moveVictims() (moved int) {
339340
// 2. iterate over the set to get the minimum revision and remove compacted watchers
340341
// 3. use minimum revision to get all key-value pairs and send those events to watchers
341342
// 4. remove synced watchers in set from unsynced group and move to synced group
342-
func (s *watchableStore) syncWatchers() int {
343+
func (s *watchableStore) syncWatchers(evs []mvccpb.Event) (int, []mvccpb.Event) {
343344
s.mu.Lock()
344345
defer s.mu.Unlock()
345346

346347
if s.unsynced.size() == 0 {
347-
return 0
348+
return 0, []mvccpb.Event{}
348349
}
349350

350351
s.store.revMu.RLock()
@@ -357,20 +358,7 @@ func (s *watchableStore) syncWatchers() int {
357358
compactionRev := s.store.compactMainRev
358359

359360
wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
360-
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
361-
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
362-
maxBytes = RevToBytes(Revision{Main: curRev + 1}, maxBytes)
363-
364-
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
365-
// values are actual key-value pairs in backend.
366-
tx := s.store.b.ReadTx()
367-
tx.RLock()
368-
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
369-
evs := kvsToEvents(s.store.lg, wg, revs, vs)
370-
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
371-
// We can only unlock after Unmarshal, which will do deep copy.
372-
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
373-
tx.RUnlock()
361+
evs = rangeEventsWithReuse(s.store.lg, s.store.b, evs, minRev, curRev+1)
374362

375363
victims := make(watcherBatch)
376364
wb := newWatcherBatch(wg, evs)
@@ -419,21 +407,68 @@ func (s *watchableStore) syncWatchers() int {
419407
}
420408
slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
421409

422-
return s.unsynced.size()
410+
return s.unsynced.size(), evs
411+
}
412+
413+
// rangeEventsWithReuse returns events in range [minRev, maxRev), while reusing already provided events.
414+
func rangeEventsWithReuse(lg *zap.Logger, b backend.Backend, evs []mvccpb.Event, minRev, maxRev int64) []mvccpb.Event {
415+
if len(evs) == 0 {
416+
return rangeEvents(lg, b, minRev, maxRev)
417+
}
418+
// append from left
419+
if evs[0].Kv.ModRevision > minRev {
420+
evs = append(rangeEvents(lg, b, minRev, evs[0].Kv.ModRevision), evs...)
421+
}
422+
// cut from left
423+
prefixIndex := 0
424+
for prefixIndex < len(evs) && evs[prefixIndex].Kv.ModRevision < minRev {
425+
prefixIndex++
426+
}
427+
evs = evs[prefixIndex:]
428+
429+
if len(evs) == 0 {
430+
return rangeEvents(lg, b, minRev, maxRev)
431+
}
432+
// append from right
433+
if evs[len(evs)-1].Kv.ModRevision+1 < maxRev {
434+
evs = append(evs, rangeEvents(lg, b, evs[len(evs)-1].Kv.ModRevision+1, maxRev)...)
435+
}
436+
// cut from right
437+
suffixIndex := len(evs) - 1
438+
for suffixIndex >= 0 && evs[suffixIndex].Kv.ModRevision >= maxRev {
439+
suffixIndex--
440+
}
441+
evs = evs[:suffixIndex+1]
442+
return evs
443+
}
444+
445+
// rangeEvents returns events in range [minRev, maxRev).
446+
func rangeEvents(lg *zap.Logger, b backend.Backend, minRev, maxRev int64) []mvccpb.Event {
447+
minBytes, maxBytes := NewRevBytes(), NewRevBytes()
448+
minBytes = RevToBytes(Revision{Main: minRev}, minBytes)
449+
maxBytes = RevToBytes(Revision{Main: maxRev}, maxBytes)
450+
451+
// UnsafeRange returns keys and values. And in boltdb, keys are revisions.
452+
// values are actual key-value pairs in backend.
453+
tx := b.ReadTx()
454+
tx.RLock()
455+
revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
456+
evs := kvsToEvents(lg, revs, vs)
457+
// Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
458+
// We can only unlock after Unmarshal, which will do deep copy.
459+
// Otherwise we will trigger SIGSEGV during boltdb re-mmap.
460+
tx.RUnlock()
461+
return evs
423462
}
424463

425464
// kvsToEvents gets all events for the watchers from all key-value pairs
426-
func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
465+
func kvsToEvents(lg *zap.Logger, revs, vals [][]byte) (evs []mvccpb.Event) {
427466
for i, v := range vals {
428467
var kv mvccpb.KeyValue
429468
if err := kv.Unmarshal(v); err != nil {
430469
lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
431470
}
432471

433-
if !wg.contains(string(kv.Key)) {
434-
continue
435-
}
436-
437472
ty := mvccpb.PUT
438473
if isTombstone(revs[i]) {
439474
ty = mvccpb.DELETE

server/storage/mvcc/watchable_store_test.go

Lines changed: 106 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) {
141141

142142
assert.Empty(t, s.synced.watcherSetByKey(string(testKey)))
143143
assert.Len(t, s.unsynced.watcherSetByKey(string(testKey)), watcherN)
144-
s.syncWatchers()
144+
s.syncWatchers([]mvccpb.Event{})
145145
assert.Len(t, s.synced.watcherSetByKey(string(testKey)), watcherN)
146146
assert.Empty(t, s.unsynced.watcherSetByKey(string(testKey)))
147147

@@ -164,6 +164,110 @@ func TestSyncWatchers(t *testing.T) {
164164
}
165165
}
166166

167+
func TestRangeEvents(t *testing.T) {
168+
b, _ := betesting.NewDefaultTmpBackend(t)
169+
lg := zaptest.NewLogger(t)
170+
s := NewStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
171+
172+
defer cleanup(s, b)
173+
174+
foo1 := []byte("foo1")
175+
foo2 := []byte("foo2")
176+
foo3 := []byte("foo3")
177+
value := []byte("bar")
178+
s.Put(foo1, value, lease.NoLease)
179+
s.Put(foo2, value, lease.NoLease)
180+
s.Put(foo3, value, lease.NoLease)
181+
s.DeleteRange(foo1, foo3) // Deletes "foo1" and "foo2" generating 2 events
182+
183+
expectEvents := []mvccpb.Event{
184+
{
185+
Type: mvccpb.PUT,
186+
Kv: &mvccpb.KeyValue{
187+
Key: foo1,
188+
CreateRevision: 2,
189+
ModRevision: 2,
190+
Version: 1,
191+
Value: value,
192+
},
193+
},
194+
{
195+
Type: mvccpb.PUT,
196+
Kv: &mvccpb.KeyValue{
197+
Key: foo2,
198+
CreateRevision: 3,
199+
ModRevision: 3,
200+
Version: 1,
201+
Value: value,
202+
},
203+
},
204+
{
205+
Type: mvccpb.PUT,
206+
Kv: &mvccpb.KeyValue{
207+
Key: foo3,
208+
CreateRevision: 4,
209+
ModRevision: 4,
210+
Version: 1,
211+
Value: value,
212+
},
213+
},
214+
{
215+
Type: mvccpb.DELETE,
216+
Kv: &mvccpb.KeyValue{
217+
Key: foo1,
218+
ModRevision: 5,
219+
},
220+
},
221+
{
222+
Type: mvccpb.DELETE,
223+
Kv: &mvccpb.KeyValue{
224+
Key: foo2,
225+
ModRevision: 5,
226+
},
227+
},
228+
}
229+
230+
tcs := []struct {
231+
minRev int64
232+
maxRev int64
233+
expectEvents []mvccpb.Event
234+
}{
235+
// maxRev, top to bottom
236+
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
237+
{minRev: 2, maxRev: 5, expectEvents: expectEvents[0:3]},
238+
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
239+
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
240+
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
241+
242+
// minRev, bottom to top
243+
{minRev: 2, maxRev: 6, expectEvents: expectEvents[0:5]},
244+
{minRev: 3, maxRev: 6, expectEvents: expectEvents[1:5]},
245+
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
246+
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
247+
{minRev: 6, maxRev: 6, expectEvents: expectEvents[0:0]},
248+
249+
// Moving window algorithm, first increase maxRev, then increase minRev, repeat.
250+
{minRev: 2, maxRev: 2, expectEvents: expectEvents[0:0]},
251+
{minRev: 2, maxRev: 3, expectEvents: expectEvents[0:1]},
252+
{minRev: 2, maxRev: 4, expectEvents: expectEvents[0:2]},
253+
{minRev: 3, maxRev: 4, expectEvents: expectEvents[1:2]},
254+
{minRev: 3, maxRev: 5, expectEvents: expectEvents[1:3]},
255+
{minRev: 4, maxRev: 5, expectEvents: expectEvents[2:3]},
256+
{minRev: 4, maxRev: 6, expectEvents: expectEvents[2:5]},
257+
{minRev: 5, maxRev: 6, expectEvents: expectEvents[3:5]},
258+
{minRev: 6, maxRev: 6, expectEvents: expectEvents[5:5]},
259+
}
260+
// reuse the evs to test rangeEventsWithReuse
261+
var evs []mvccpb.Event
262+
for i, tc := range tcs {
263+
t.Run(fmt.Sprintf("%d rangeEvents(%d, %d)", i, tc.minRev, tc.maxRev), func(t *testing.T) {
264+
assert.ElementsMatch(t, tc.expectEvents, rangeEvents(lg, b, tc.minRev, tc.maxRev))
265+
evs = rangeEventsWithReuse(lg, b, evs, tc.minRev, tc.maxRev)
266+
assert.ElementsMatch(t, tc.expectEvents, evs)
267+
})
268+
}
269+
}
270+
167271
// TestWatchCompacted tests a watcher that watches on a compacted revision.
168272
func TestWatchCompacted(t *testing.T) {
169273
b, _ := betesting.NewDefaultTmpBackend(t)
@@ -236,7 +340,7 @@ func TestWatchNoEventLossOnCompact(t *testing.T) {
236340
require.NoError(t, err)
237341
}
238342
// fill up w.Chan() with 1 buf via 2 compacted watch response
239-
s.syncWatchers()
343+
s.syncWatchers([]mvccpb.Event{})
240344

241345
for len(watchers) > 0 {
242346
resp := <-w.Chan()

server/storage/mvcc/watcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ func TestWatcherRequestProgress(t *testing.T) {
320320
default:
321321
}
322322

323-
s.syncWatchers()
323+
s.syncWatchers([]mvccpb.Event{})
324324

325325
w.RequestProgress(id)
326326
wrs := WatchResponse{WatchID: id, Revision: 2}
@@ -359,7 +359,7 @@ func TestWatcherRequestProgressAll(t *testing.T) {
359359
default:
360360
}
361361

362-
s.syncWatchers()
362+
s.syncWatchers([]mvccpb.Event{})
363363

364364
w.RequestProgressAll()
365365
wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}

0 commit comments

Comments
 (0)