Skip to content

Commit 92ec7d9

Browse files
paulbellamytomwilkie
authored andcommitted
Move probe main.go to prog/probe/, break out a probe struct with appropriate responsibilities.
Also adds test for probe 'engine'
1 parent 8dbc586 commit 92ec7d9

14 files changed

+337
-257
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ app/app
3838
app/scope-app
3939
probe/probe
4040
probe/scope-probe
41+
prog/probe/scope-probe
4142
docker/scope-app
4243
docker/scope-probe
4344
docker/docker*

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
SUDO=sudo -E
55
DOCKERHUB_USER=weaveworks
66
APP_EXE=app/scope-app
7-
PROBE_EXE=probe/scope-probe
7+
PROBE_EXE=prog/probe/scope-probe
88
FIXPROBE_EXE=experimental/fixprobe/fixprobe
99
SCOPE_IMAGE=$(DOCKERHUB_USER)/scope
1010
SCOPE_EXPORT=scope.tar

circle.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ test:
4545
parallel: true
4646
- cd $SRCDIR; make RM= static:
4747
parallel: true
48-
- cd $SRCDIR; rm -f app/scope-app probe/scope-probe; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make RM= app/scope-app probe/scope-probe; else GOOS=darwin make RM= app/scope-app probe/scope-probe; fi:
48+
- cd $SRCDIR; rm -f app/scope-app prog/probe/scope-probe; if [ "$CIRCLE_NODE_INDEX" = "0" ]; then GOARCH=arm make RM= app/scope-app prog/probe/scope-probe; else GOOS=darwin make RM= app/scope-app prog/probe/scope-probe; fi:
4949
parallel: true
50-
- cd $SRCDIR; rm -f app/scope-app probe/scope-probe; make RM=:
50+
- cd $SRCDIR; rm -f app/scope-app prog/probe/scope-probe; make RM=:
5151
parallel: true
5252
- cd $SRCDIR/experimental; ./build_on_circle.sh:
5353
parallel: true

probe/hostname.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
package main
1+
package probe
22

33
import "os"
44

5-
func hostname() string {
5+
// Hostname returns the hostname of this host.
6+
func Hostname() string {
67
if hostname := os.Getenv("SCOPE_HOSTNAME"); hostname != "" {
78
return hostname
89
}

probe/instrumentation.go

-27
This file was deleted.

probe/probe.go

+162
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package probe
2+
3+
import (
4+
"log"
5+
"sync"
6+
"time"
7+
8+
"github.com/weaveworks/scope/report"
9+
"github.com/weaveworks/scope/xfer"
10+
)
11+
12+
// Probe sits there, generating and publishing reports.
13+
type Probe struct {
14+
spyInterval, publishInterval time.Duration
15+
publisher xfer.Publisher
16+
17+
tickers []Ticker
18+
reporters []Reporter
19+
taggers []Tagger
20+
21+
quit chan struct{}
22+
done sync.WaitGroup
23+
rpt syncReport
24+
}
25+
26+
// Tagger tags nodes with value-add node metadata.
27+
type Tagger interface {
28+
Tag(r report.Report) (report.Report, error)
29+
}
30+
31+
// Reporter generates Reports.
32+
type Reporter interface {
33+
Report() (report.Report, error)
34+
}
35+
36+
// Ticker is something which will be invoked every spyDuration.
37+
// It's useful for things that should be updated on that interval.
38+
// For example, cached shared state between Taggers and Reporters.
39+
type Ticker interface {
40+
Tick() error
41+
}
42+
43+
// New makes a new Probe.
44+
func New(spyInterval, publishInterval time.Duration, publisher xfer.Publisher) *Probe {
45+
result := &Probe{
46+
spyInterval: spyInterval,
47+
publishInterval: publishInterval,
48+
publisher: publisher,
49+
quit: make(chan struct{}),
50+
}
51+
result.rpt.swap(report.MakeReport())
52+
return result
53+
}
54+
55+
// AddTagger adds a new Tagger to the Probe
56+
func (p *Probe) AddTagger(ts ...Tagger) {
57+
p.taggers = append(p.taggers, ts...)
58+
}
59+
60+
// AddReporter adds a new Reported to the Probe
61+
func (p *Probe) AddReporter(rs ...Reporter) {
62+
p.reporters = append(p.reporters, rs...)
63+
}
64+
65+
// AddTicker adds a new Ticker to the Probe
66+
func (p *Probe) AddTicker(ts ...Ticker) {
67+
p.tickers = append(p.tickers, ts...)
68+
}
69+
70+
// Start starts the probe
71+
func (p *Probe) Start() {
72+
p.done.Add(2)
73+
go p.spyLoop()
74+
go p.publishLoop()
75+
}
76+
77+
// Stop stops the probe
78+
func (p *Probe) Stop() {
79+
close(p.quit)
80+
p.done.Wait()
81+
}
82+
83+
func (p *Probe) spyLoop() {
84+
defer p.done.Done()
85+
spyTick := time.Tick(p.spyInterval)
86+
87+
for {
88+
select {
89+
case <-spyTick:
90+
start := time.Now()
91+
for _, ticker := range p.tickers {
92+
if err := ticker.Tick(); err != nil {
93+
log.Printf("error doing ticker: %v", err)
94+
}
95+
}
96+
97+
localReport := p.rpt.copy()
98+
localReport = localReport.Merge(p.report())
99+
localReport = p.tag(localReport)
100+
p.rpt.swap(localReport)
101+
102+
if took := time.Since(start); took > p.spyInterval {
103+
log.Printf("report generation took too long (%s)", took)
104+
}
105+
106+
case <-p.quit:
107+
return
108+
}
109+
}
110+
}
111+
112+
func (p *Probe) report() report.Report {
113+
reports := make(chan report.Report, len(p.reporters))
114+
for _, rep := range p.reporters {
115+
go func(rep Reporter) {
116+
newReport, err := rep.Report()
117+
if err != nil {
118+
log.Printf("error generating report: %v", err)
119+
newReport = report.MakeReport() // empty is OK to merge
120+
}
121+
reports <- newReport
122+
}(rep)
123+
}
124+
125+
result := report.MakeReport()
126+
for i := 0; i < cap(reports); i++ {
127+
result = result.Merge(<-reports)
128+
}
129+
return result
130+
}
131+
132+
func (p *Probe) tag(r report.Report) report.Report {
133+
var err error
134+
for _, tagger := range p.taggers {
135+
r, err = tagger.Tag(r)
136+
if err != nil {
137+
log.Printf("error applying tagger: %v", err)
138+
}
139+
}
140+
return r
141+
}
142+
143+
func (p *Probe) publishLoop() {
144+
defer p.done.Done()
145+
var (
146+
pubTick = time.Tick(p.publishInterval)
147+
rptPub = xfer.NewReportPublisher(p.publisher)
148+
)
149+
150+
for {
151+
select {
152+
case <-pubTick:
153+
localReport := p.rpt.swap(report.MakeReport())
154+
if err := rptPub.Publish(localReport); err != nil {
155+
log.Printf("publish: %v", err)
156+
}
157+
158+
case <-p.quit:
159+
return
160+
}
161+
}
162+
}

probe/probe_internal_test.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package probe
2+
3+
import (
4+
"compress/gzip"
5+
"encoding/gob"
6+
"io"
7+
"reflect"
8+
"testing"
9+
"time"
10+
11+
"github.com/weaveworks/scope/report"
12+
"github.com/weaveworks/scope/test"
13+
)
14+
15+
func TestApply(t *testing.T) {
16+
var (
17+
endpointNodeID = "c"
18+
addressNodeID = "d"
19+
endpointNode = report.MakeNodeWith(map[string]string{"5": "6"})
20+
addressNode = report.MakeNodeWith(map[string]string{"7": "8"})
21+
)
22+
23+
p := New(0, 0, nil)
24+
p.AddTagger(NewTopologyTagger())
25+
26+
r := report.MakeReport()
27+
r.Endpoint.AddNode(endpointNodeID, endpointNode)
28+
r.Address.AddNode(addressNodeID, addressNode)
29+
r = p.tag(r)
30+
31+
for _, tuple := range []struct {
32+
want report.Node
33+
from report.Topology
34+
via string
35+
}{
36+
{endpointNode.Merge(report.MakeNodeWith(map[string]string{"topology": "endpoint"})), r.Endpoint, endpointNodeID},
37+
{addressNode.Merge(report.MakeNodeWith(map[string]string{"topology": "address"})), r.Address, addressNodeID},
38+
} {
39+
if want, have := tuple.want, tuple.from.Nodes[tuple.via]; !reflect.DeepEqual(want, have) {
40+
t.Errorf("want %+v, have %+v", want, have)
41+
}
42+
}
43+
}
44+
45+
type mockReporter struct {
46+
r report.Report
47+
}
48+
49+
func (m mockReporter) Report() (report.Report, error) {
50+
return m.r.Copy(), nil
51+
}
52+
53+
type mockPublisher struct {
54+
have chan report.Report
55+
}
56+
57+
func (m mockPublisher) Publish(in io.Reader) error {
58+
var r report.Report
59+
if reader, err := gzip.NewReader(in); err != nil {
60+
return err
61+
} else if err := gob.NewDecoder(reader).Decode(&r); err != nil {
62+
return err
63+
}
64+
m.have <- r
65+
return nil
66+
}
67+
68+
func (m mockPublisher) Stop() {
69+
close(m.have)
70+
}
71+
72+
func TestProbe(t *testing.T) {
73+
want := report.MakeReport()
74+
node := report.MakeNodeWith(map[string]string{"b": "c"})
75+
want.Endpoint.AddNode("a", node)
76+
pub := mockPublisher{make(chan report.Report)}
77+
78+
p := New(10*time.Millisecond, 100*time.Millisecond, pub)
79+
p.AddReporter(mockReporter{want})
80+
p.Start()
81+
defer p.Stop()
82+
83+
test.Poll(t, 300*time.Millisecond, want, func() interface{} {
84+
return <-pub.have
85+
})
86+
}

probe/sync_report.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package probe
2+
3+
import (
4+
"sync"
5+
6+
"github.com/weaveworks/scope/report"
7+
)
8+
9+
type syncReport struct {
10+
mtx sync.RWMutex
11+
rpt report.Report
12+
}
13+
14+
func (r *syncReport) swap(other report.Report) report.Report {
15+
r.mtx.Lock()
16+
defer r.mtx.Unlock()
17+
old := r.rpt
18+
r.rpt = other
19+
return old
20+
}
21+
22+
func (r *syncReport) copy() report.Report {
23+
r.mtx.RLock()
24+
defer r.mtx.RUnlock()
25+
return r.rpt.Copy()
26+
}

probe/tag_report_test.go

-45
This file was deleted.

0 commit comments

Comments
 (0)