Skip to content

Commit 2a240b1

Browse files
committed
make flumetest.Start() concurrency safe
1 parent c5b1037 commit 2a240b1

File tree

3 files changed

+106
-6
lines changed

3 files changed

+106
-6
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ fmt:
3131
go fmt $(PACKAGES)
3232

3333
test:
34-
go test $(BUILD_FLAGS) $(PACKAGES)
34+
go test -race $(BUILD_FLAGS) $(PACKAGES)
3535

3636
cover: builddir
3737
# runs go test in each package one at a time, generating coverage profiling

flumetest/flumetest.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"io/ioutil"
2424
"os"
2525
"strconv"
26+
"sync"
2627
"testing"
2728
)
2829

@@ -87,12 +88,50 @@ func MustSetDefaults() {
8788
// Be sure to call the returned function at the end of the test to reset the log
8889
// output to its original setting.
8990
func Start(t testing.TB) func() {
91+
// delegate to an inner method which is testable
92+
return start(t)
93+
}
94+
95+
type lockedBuf struct {
96+
buf *bytes.Buffer
97+
sync.Mutex
98+
}
99+
100+
func (l *lockedBuf) Write(p []byte) (n int, err error) {
101+
l.Lock()
102+
defer l.Unlock()
103+
return l.buf.Write(p)
104+
}
105+
106+
func (l *lockedBuf) Len() int {
107+
l.Lock()
108+
defer l.Unlock()
109+
return l.buf.Len()
110+
}
111+
112+
func (l *lockedBuf) String() string {
113+
l.Lock()
114+
defer l.Unlock()
115+
return l.buf.String()
116+
}
117+
118+
type testingTB interface {
119+
Failed() bool
120+
Log(args ...interface{})
121+
}
122+
123+
func start(t testingTB) func() {
90124
var revert func()
91125
if Verbose {
92126
revert = flume.SetOut(flume.LogFuncWriter(t.Log, true))
93127
} else {
94-
buf := bytes.NewBuffer(nil)
95-
revertOut := flume.SetOut(buf)
128+
// need to use a synchronized version of buf, since
129+
// logs may be written on other goroutines than this one,
130+
// and bytes.Buffer is not concurrent safe.
131+
buf := lockedBuf{
132+
buf: bytes.NewBuffer(nil),
133+
}
134+
revertOut := flume.SetOut(&buf)
96135
revert = func() {
97136
revertOut()
98137
// make sure that if the test panics or fails, we dump the logs

flumetest/flumetest_test.go

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,78 @@
11
package flumetest
22

33
import (
4+
"fmt"
45
"github.com/gemalto/flume"
6+
"github.com/stretchr/testify/assert"
7+
"sync"
58
"testing"
69
)
710

811
func init() {
912
MustSetDefaults()
1013
}
1114

12-
func TestStart(t *testing.T) {
13-
defer Start(t)()
15+
type mockT struct {
16+
failed bool
17+
lastLog string
18+
sync.Mutex
19+
}
20+
21+
func (m *mockT) Failed() bool {
22+
m.Lock()
23+
defer m.Unlock()
24+
return m.failed
25+
}
1426

27+
func (m *mockT) Log(args ...interface{}) {
28+
m.Lock()
29+
defer m.Unlock()
30+
m.lastLog = fmt.Sprint(args...)
31+
}
32+
33+
func TestStart(t *testing.T) {
1534
var log = flume.New("TestStart")
16-
log.Info("Hi", "color", "red", "size", 5, "multilinevalue")
35+
36+
fakeTestRun := func(succeed bool) string {
37+
m := mockT{
38+
failed: !succeed,
39+
}
40+
finish := start(&m)
41+
42+
log.Info("Hi", "color", "red")
43+
44+
finish()
45+
m.Lock()
46+
defer m.Unlock()
47+
return m.lastLog
48+
}
49+
assert.Empty(t, fakeTestRun(true), "should not have logged, because the test didn't fail")
50+
assert.Contains(t, fakeTestRun(false), "color:red", "should have logged since test failed")
51+
52+
// this test is meant to trigger the race detector if we're not synchronizing correctly on the message
53+
// buffer
54+
m := mockT{
55+
failed: true,
56+
}
57+
finish := start(&m)
58+
59+
var wg sync.WaitGroup
60+
wg.Add(1)
61+
stop := make(chan struct{})
62+
go func() {
63+
wg.Done()
64+
for {
65+
select {
66+
case <-stop:
67+
return
68+
default:
69+
log.Info("logging on a different goroutine, for race detector")
70+
}
71+
}
72+
73+
}()
74+
wg.Wait()
75+
finish()
76+
stop <- struct{}{}
77+
1778
}

0 commit comments

Comments
 (0)