Skip to content

Commit 7738112

Browse files
committed
add ability to cap latency for chaff requests profile
1 parent 4569185 commit 7738112

File tree

2 files changed

+91
-19
lines changed

2 files changed

+91
-19
lines changed

tracker.go

+42-19
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ const (
4242
// (i.e. this library is falling behind or requests volumes are too large),
4343
// then some individual requests will be dropped.
4444
type Tracker struct {
45-
mu sync.RWMutex
46-
buffer []*request
47-
size int
48-
cap int
49-
pos int
50-
ch chan *request
51-
done chan struct{}
52-
resp Responder
45+
mu sync.RWMutex
46+
buffer []*request
47+
size int
48+
cap int
49+
pos int
50+
ch chan *request
51+
done chan struct{}
52+
resp Responder
53+
maxLatencyMs uint64
5354
}
5455

5556
type request struct {
@@ -67,18 +68,28 @@ func newRequest(start, end time.Time, headerSize, bodySize uint64) *request {
6768
}
6869

6970
// New creates a new tracker with the `DefaultCapacity`.
70-
func New() *Tracker {
71-
t, _ := NewTracker(&PlainResponder{}, DefaultCapacity)
71+
func New(opts ...Option) *Tracker {
72+
t, _ := NewTracker(&PlainResponder{}, DefaultCapacity, opts...)
7273
return t
7374
}
7475

76+
// Option defines a method for applying options when configuring a new tracker.
77+
type Option func(*Tracker)
78+
79+
// WithMaxLatency puts a cap on the tunnel latency.
80+
func WithMaxLatency(maxLatencyMs uint64) Option {
81+
return func(t *Tracker) {
82+
t.maxLatencyMs = maxLatencyMs
83+
}
84+
}
85+
7586
// NewTracker creates a tracker with custom capacity.
7687
// Launches a goroutine to update the request metrics.
7788
// To shut this down, use the .Close() method.
7889
// The Responder parameter is used to write the output. If non is specified,
7990
// the tracker will default to the "PlainResponder" which just writes the raw
8091
// chaff bytes.
81-
func NewTracker(resp Responder, cap int) (*Tracker, error) {
92+
func NewTracker(resp Responder, cap int, opts ...Option) (*Tracker, error) {
8293
if cap < 1 || cap > DefaultCapacity {
8394
return nil, fmt.Errorf("cap must be 1 <= cap <= 100, got: %v", cap)
8495
}
@@ -88,14 +99,21 @@ func NewTracker(resp Responder, cap int) (*Tracker, error) {
8899
}
89100

90101
t := &Tracker{
91-
buffer: make([]*request, 0, int(cap)),
92-
size: 0,
93-
cap: cap,
94-
pos: 0,
95-
ch: make(chan *request, cap),
96-
done: make(chan struct{}),
97-
resp: resp,
102+
buffer: make([]*request, 0, int(cap)),
103+
size: 0,
104+
cap: cap,
105+
pos: 0,
106+
ch: make(chan *request, cap),
107+
done: make(chan struct{}),
108+
resp: resp,
109+
maxLatencyMs: 0,
98110
}
111+
112+
// Apply options.
113+
for _, opt := range opts {
114+
opt(t)
115+
}
116+
99117
go t.updater()
100118
return t, nil
101119
}
@@ -152,8 +170,13 @@ func (t *Tracker) CalculateProfile() *request {
152170
}
153171
divisor := uint64(t.size)
154172

173+
latencyMs := latency / divisor
174+
if max := t.maxLatencyMs; max > 0 && latencyMs > max {
175+
latencyMs = max
176+
}
177+
155178
return &request{
156-
latencyMs: latency / divisor,
179+
latencyMs: latencyMs,
157180
headerSize: uint64(hSize / divisor),
158181
bodySize: uint64(bSize / divisor),
159182
}

tracker_test.go

+49
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@ import (
2222
"net/http"
2323
"net/http/httptest"
2424
"strings"
25+
"sync"
2526
"testing"
2627
"time"
2728

2829
"github.com/google/go-cmp/cmp"
2930
)
3031

3132
func TestRandomData(t *testing.T) {
33+
t.Parallel()
34+
3235
d := RandomData(0)
3336
if d != "" {
3437
t.Fatalf("expected empty string, got: %q", d)
@@ -55,6 +58,7 @@ func checkLength(t *testing.T, expected int, length int) {
5558
}
5659

5760
func TestChaff(t *testing.T) {
61+
t.Parallel()
5862
track := New()
5963
defer track.Close()
6064

@@ -88,6 +92,7 @@ func TestChaff(t *testing.T) {
8892
}
8993

9094
func TestTracking(t *testing.T) {
95+
t.Parallel()
9196
track := New()
9297
defer track.Close()
9398

@@ -130,7 +135,51 @@ func TestTracking(t *testing.T) {
130135
}
131136
}
132137

138+
func TestMax(t *testing.T) {
139+
t.Parallel()
140+
141+
track := New(WithMaxLatency(25))
142+
defer track.Close()
143+
144+
var wg sync.WaitGroup
145+
for i := 0; i <= DefaultCapacity*2; i++ {
146+
wrapped := track.Track(
147+
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
148+
time.Sleep(50 * time.Millisecond)
149+
w.WriteHeader(http.StatusAccepted)
150+
w.Header().Add("padding", strings.Repeat("a", i+1))
151+
fmt.Fprintf(w, "%s", strings.Repeat("b", i+1))
152+
}))
153+
154+
recorder := httptest.NewRecorder()
155+
request, err := http.NewRequest("GET", "/", strings.NewReader(""))
156+
if err != nil {
157+
t.Fatalf("http.NewRequest: %v", err)
158+
}
159+
160+
wg.Add(1)
161+
go func(t *testing.T) {
162+
defer wg.Done()
163+
t.Helper()
164+
wrapped.ServeHTTP(recorder, request)
165+
if recorder.Code != http.StatusAccepted {
166+
t.Fatalf("wrong error code: want: %v, got: %v", http.StatusAccepted, recorder.Code)
167+
}
168+
}(t)
169+
}
170+
171+
wg.Wait()
172+
173+
got := track.CalculateProfile()
174+
// Only checking latency
175+
wantLatency := uint64(25)
176+
if diff := cmp.Diff(wantLatency, got.latencyMs); diff != "" {
177+
t.Errorf("mismatch (-want, +got):\n%s", diff)
178+
}
179+
}
180+
133181
func TestJSONMiddleware(t *testing.T) {
182+
t.Parallel()
134183
type result struct {
135184
Name string `json:"name"`
136185
}

0 commit comments

Comments
 (0)