Skip to content

Commit

Permalink
Split the analysis out of the hammer (#125)
Browse files Browse the repository at this point in the history
Separating the experimentation from the measurement makes it easier to understand the code and avoid the hammer growing into a monolith.
  • Loading branch information
mhutchinson authored Aug 14, 2024
1 parent fd57780 commit 2ff487d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 67 deletions.
130 changes: 73 additions & 57 deletions hammer/hammer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,57 +89,54 @@ func main() {
klog.Exitf("Failed to get initial state of the log: %v", err)
}

ha := newHammerAnalyser(&tracker, 100)
go ha.updateStatsLoop(ctx)
go ha.errorLoop(ctx)

gen := newLeafGenerator(tracker.LatestConsistent.Size, *leafMinSize, *dupChance)
hammer := NewHammer(&tracker, f.Fetch, w.Write, gen)
hammer := NewHammer(&tracker, f.Fetch, w.Write, gen, ha.seqLeafChan, ha.errChan)
hammer.Run(ctx)

if *showUI {
c := newController(hammer)
c := newController(hammer, ha)
c.Run(ctx)
} else {
<-ctx.Done()
}
}

func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter, gen func() []byte) *Hammer {
func NewHammer(tracker *client.LogStateTracker, f client.Fetcher, w LeafWriter, gen func() []byte, seqLeafChan chan<- leafTime, errChan chan<- error) *Hammer {
readThrottle := NewThrottle(*maxReadOpsPerSecond)
writeThrottle := NewThrottle(*maxWriteOpsPerSecond)
errChan := make(chan error, 20)
leafSampleChan := make(chan leafTime, 100)

randomReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, RandomNextLeaf(), readThrottle.tokenChan, errChan)
})
fullReaders := newWorkerPool(func() worker {
return NewLeafReader(tracker, f, MonotonicallyIncreasingNextLeaf(), readThrottle.tokenChan, errChan)
})
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, leafSampleChan) })
writers := newWorkerPool(func() worker { return NewLogWriter(w, gen, writeThrottle.tokenChan, errChan, seqLeafChan) })

return &Hammer{
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
errChan: errChan,
leafSampleChan: leafSampleChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
randomReaders: randomReaders,
fullReaders: fullReaders,
writers: writers,
readThrottle: readThrottle,
writeThrottle: writeThrottle,
tracker: tracker,
}
}

// Hammer is responsible for coordinating the operations against the log in the form
// of write and read operations. The work of analysing the results of hammering should
// live outside of this class.
type Hammer struct {
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
errChan chan error
leafSampleChan chan leafTime
queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
randomReaders workerPool
fullReaders workerPool
writers workerPool
readThrottle *Throttle
writeThrottle *Throttle
tracker *client.LogStateTracker
}

func (h *Hammer) Run(ctx context.Context) {
Expand All @@ -154,35 +151,10 @@ func (h *Hammer) Run(ctx context.Context) {
h.writers.Grow(ctx)
}

go h.errorLoop(ctx)

go h.readThrottle.Run(ctx)
go h.writeThrottle.Run(ctx)

go h.updateCheckpointLoop(ctx)
go h.updateStatsLoop(ctx)
}

func (h *Hammer) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-h.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}
}

func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
Expand All @@ -209,16 +181,38 @@ func (h *Hammer) updateCheckpointLoop(ctx context.Context) {
}
}

func (h *Hammer) updateStatsLoop(ctx context.Context) {
func newHammerAnalyser(tracker *client.LogStateTracker, chanSize int) *HammerAnalyser {
leafSampleChan := make(chan leafTime, chanSize)
errChan := make(chan error, 20)
return &HammerAnalyser{
tracker: tracker,
seqLeafChan: leafSampleChan,
errChan: errChan,
integrationTime: movingaverage.New(30),
queueTime: movingaverage.New(30),
}
}

// HammerAnalyser is responsible for measuring and interpreting the result of hammering.
type HammerAnalyser struct {
tracker *client.LogStateTracker
seqLeafChan chan leafTime
errChan chan error

queueTime *movingaverage.MovingAverage
integrationTime *movingaverage.MovingAverage
}

func (a *HammerAnalyser) updateStatsLoop(ctx context.Context) {
tick := time.NewTicker(100 * time.Millisecond)
size := h.tracker.LatestConsistent.Size
size := a.tracker.LatestConsistent.Size
for {
select {
case <-ctx.Done():
return
case <-tick.C:
}
newSize := h.tracker.LatestConsistent.Size
newSize := a.tracker.LatestConsistent.Size
if newSize <= size {
continue
}
Expand All @@ -229,7 +223,7 @@ func (h *Hammer) updateStatsLoop(ctx context.Context) {
var sample *leafTime
for {
if sample == nil {
l, ok := <-h.leafSampleChan
l, ok := <-a.seqLeafChan
if !ok {
break
}
Expand All @@ -254,8 +248,30 @@ func (h *Hammer) updateStatsLoop(ctx context.Context) {
sample = nil
}
if numLeaves > 0 {
h.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
h.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
a.integrationTime.Add(float64(totalLatency/time.Millisecond) / float64(numLeaves))
a.queueTime.Add(float64(queueLatency/time.Millisecond) / float64(numLeaves))
}
}
}

func (a *HammerAnalyser) errorLoop(ctx context.Context) {
tick := time.NewTicker(time.Second)
pbCount := 0
for {
select {
case <-ctx.Done(): //context cancelled
return
case <-tick.C:
if pbCount > 0 {
klog.Warningf("%d requests received pushback from log", pbCount)
pbCount = 0
}
case err := <-a.errChan:
if errors.Is(err, ErrRetry) {
pbCount++
continue
}
klog.Warning(err)
}
}
}
Expand Down
27 changes: 17 additions & 10 deletions hammer/tui.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"flag"
"fmt"
"strings"
"time"

movingaverage "github.com/RobinUS2/golang-moving-average"
Expand All @@ -28,16 +29,18 @@ import (

type tuiController struct {
hammer *Hammer
analyser *HammerAnalyser
app *tview.Application
statusView *tview.TextView
logView *tview.TextView
helpView *tview.TextView
}

func newController(h *Hammer) *tuiController {
func newController(h *Hammer, a *HammerAnalyser) *tuiController {
c := tuiController{
hammer: h,
app: tview.NewApplication(),
hammer: h,
analyser: a,
app: tview.NewApplication(),
}
grid := tview.NewGrid()
grid.SetRows(5, 0, 10).SetColumns(0).SetBorders(true)
Expand Down Expand Up @@ -129,17 +132,21 @@ func (c *tuiController) updateStatsLoop(ctx context.Context, interval time.Durat
growth.Add(float64(s - lastSize))
lastSize = s
qps := growth.Avg() * float64(time.Second/interval)
text := fmt.Sprintf("Read (%d workers): %s\nWrite (%d workers): %s\nTreeSize: %d (Δ %.0fqps over %ds)\nTime-in-queue: %s\nObserved-time-to-integrate: %s",
readWorkersLine := fmt.Sprintf("Read (%d workers): %s",
c.hammer.fullReaders.Size()+c.hammer.randomReaders.Size(),
c.hammer.readThrottle.String(),
c.hammer.readThrottle.String())
writeWorkersLine := fmt.Sprintf("Write (%d workers): %s",
c.hammer.writers.Size(),
c.hammer.writeThrottle.String(),
c.hammer.writeThrottle.String())
treeSizeLine := fmt.Sprintf("TreeSize: %d (Δ %.0fqps over %ds)",
s,
qps,
time.Duration(maSlots*int(interval))/time.Second,
formatMovingAverage(c.hammer.queueTime),
formatMovingAverage(c.hammer.integrationTime),
)
time.Duration(maSlots*int(interval))/time.Second)
queueLine := fmt.Sprintf("Time-in-queue: %s",
formatMovingAverage(c.analyser.queueTime))
integrateLine := fmt.Sprintf("Observed-time-to-integrate: %s",
formatMovingAverage(c.analyser.integrationTime))
text := strings.Join([]string{readWorkersLine, writeWorkersLine, treeSizeLine, queueLine, integrateLine}, "\n")
c.statusView.SetText(text)
c.app.Draw()
}
Expand Down

0 comments on commit 2ff487d

Please sign in to comment.