Skip to content

Commit a9fe306

Browse files
authored
Tests for the hammer analysis (#137)
Basic tests to make sure that the average reported is correct and that async behaviour is reliable. Both the test and the analysis code under test had race unsafe code, which is now fixed.
1 parent 2ff487d commit a9fe306

File tree

3 files changed

+88
-16
lines changed

3 files changed

+88
-16
lines changed

hammer/hammer.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func main() {
8989
klog.Exitf("Failed to get initial state of the log: %v", err)
9090
}
9191

92-
ha := newHammerAnalyser(&tracker, 100)
92+
ha := newHammerAnalyser(func() uint64 { return tracker.LatestConsistent.Size })
9393
go ha.updateStatsLoop(ctx)
9494
go ha.errorLoop(ctx)
9595

@@ -181,38 +181,38 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
181181
}
182182
}
183183

184-
func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAnalyser {
185-
leafSampleChan := make(chan leafTime, chanSize)
184+
func newHammerAnalyser(treeSizeFn func() uint64) *HammerAnalyser {
185+
leafSampleChan := make(chan leafTime, 100)
186186
errChan := make(chan error, 20)
187187
return &HammerAnalyser{
188-
tracker: tracker,
188+
treeSizeFn: treeSizeFn,
189189
seqLeafChan: leafSampleChan,
190190
errChan: errChan,
191-
integrationTime: movingaverage.New(30),
192-
queueTime: movingaverage.New(30),
191+
integrationTime: movingaverage.Concurrent(movingaverage.New(30)),
192+
queueTime: movingaverage.Concurrent(movingaverage.New(30)),
193193
}
194194
}
195195

196196
// HammerAnalyser is responsible for measuring and interpreting the result of hammering.
197197
type HammerAnalyser struct {
198-
tracker *client.LogStateTracker
198+
treeSizeFn func() uint64
199199
seqLeafChan chan leafTime
200200
errChan chan error
201201

202-
queueTime *movingaverage.MovingAverage
203-
integrationTime *movingaverage.MovingAverage
202+
queueTime *movingaverage.ConcurrentMovingAverage
203+
integrationTime *movingaverage.ConcurrentMovingAverage
204204
}
205205

206206
func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
207207
tick := time.NewTicker(100 * time.Millisecond)
208-
size := a.tracker.LatestConsistent.Size
208+
size := a.treeSizeFn()
209209
for {
210210
select {
211211
case <-ctx.Done():
212212
return
213213
case <-tick.C:
214214
}
215-
newSize := a.tracker.LatestConsistent.Size
215+
newSize := a.treeSizeFn()
216216
if newSize <= size {
217217
continue
218218
}
@@ -221,13 +221,18 @@ func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
221221
queueLatency := time.Duration(0)
222222
numLeaves := 0
223223
var sample *leafTime
224+
ReadLoop:
224225
for {
225226
if sample == nil {
226-
l, ok := <-a.seqLeafChan
227-
if !ok {
228-
break
227+
select {
228+
case l, ok := <-a.seqLeafChan:
229+
if !ok {
230+
break ReadLoop
231+
}
232+
sample = &l
233+
default:
234+
break ReadLoop
229235
}
230-
sample = &l
231236
}
232237
// Stop considering leaf times once we've caught up with that cross
233238
// either the current checkpoint or "now":

hammer/hammer_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2024 The Tessera authors. All Rights Reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package main
16+
17+
import (
18+
"context"
19+
"sync"
20+
"testing"
21+
"time"
22+
)
23+
24+
func TestHammerAnalyser_Stats(t *testing.T) {
25+
ctx, cancel := context.WithCancel(context.Background())
26+
defer cancel()
27+
28+
var treeSize treeSizeState
29+
ha := newHammerAnalyser(treeSize.getSize)
30+
31+
go ha.updateStatsLoop(ctx)
32+
33+
time.Sleep(100 * time.Millisecond)
34+
35+
baseTime := time.Now().Add(-1 * time.Minute)
36+
for i := 0; i < 10; i++ {
37+
ha.seqLeafChan <- leafTime{
38+
idx: uint64(i),
39+
queuedAt: baseTime,
40+
assignedAt: baseTime.Add(time.Duration(i) * time.Second),
41+
}
42+
}
43+
treeSize.setSize(10)
44+
time.Sleep(500 * time.Millisecond)
45+
46+
avg := ha.queueTime.Avg()
47+
if want := float64(4500); avg != want {
48+
t.Errorf("integration time avg: got != want (%f != %f)", avg, want)
49+
}
50+
}
51+
52+
type treeSizeState struct {
53+
size uint64
54+
mux sync.RWMutex
55+
}
56+
57+
func (s *treeSizeState) getSize() uint64 {
58+
s.mux.RLock()
59+
defer s.mux.RUnlock()
60+
return s.size
61+
}
62+
63+
func (s *treeSizeState) setSize(size uint64) {
64+
s.mux.Lock()
65+
defer s.mux.Unlock()
66+
s.size = size
67+
}

hammer/tui.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (c *tuiController) Run(ctx context.Context) {
112112
}
113113

114114
func (c *tuiController) updateStatsLoop(ctx context.Context, interval time.Duration) {
115-
formatMovingAverage := func(ma *movingaverage.MovingAverage) string {
115+
formatMovingAverage := func(ma *movingaverage.ConcurrentMovingAverage) string {
116116
aMin, _ := ma.Min()
117117
aMax, _ := ma.Max()
118118
aAvg := ma.Avg()

0 commit comments

Comments
 (0)