Skip to content

Commit 3363208

Browse files
committed
Support thresholds and the end-of-test summary in distributed execution
1 parent 2443ac6 commit 3363208

File tree

9 files changed

+600
-37
lines changed

9 files changed

+600
-37
lines changed

cmd/agent.go

+70
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package cmd
22

33
import (
44
"bytes"
5+
"context"
56
"encoding/json"
7+
"time"
68

9+
"github.com/sirupsen/logrus"
710
"github.com/spf13/afero"
811
"github.com/spf13/cobra"
912
"go.k6.io/k6/cmd/state"
@@ -13,11 +16,76 @@ import (
1316
"go.k6.io/k6/lib"
1417
"go.k6.io/k6/loader"
1518
"go.k6.io/k6/metrics"
19+
"go.k6.io/k6/metrics/engine"
1620
"google.golang.org/grpc"
1721
"google.golang.org/grpc/credentials/insecure"
1822
"gopkg.in/guregu/null.v3"
1923
)
2024

25+
// TODO: something cleaner
26+
func getMetricsHook(
27+
ctx context.Context, instanceID uint32,
28+
client distributed.DistributedTestClient, logger logrus.FieldLogger,
29+
) func(*engine.MetricsEngine) func() {
30+
logger = logger.WithField("component", "metric-engine-hook")
31+
return func(me *engine.MetricsEngine) func() {
32+
stop := make(chan struct{})
33+
done := make(chan struct{})
34+
35+
dumpMetrics := func() {
36+
logger.Debug("Starting metric dump...")
37+
me.MetricsLock.Lock()
38+
defer me.MetricsLock.Unlock()
39+
40+
metrics := make([]*distributed.MetricDump, 0, len(me.ObservedMetrics))
41+
for _, om := range me.ObservedMetrics {
42+
data, err := om.Sink.Drain()
43+
if err != nil {
44+
logger.Errorf("There was a problem draining the sink for metric %s: %s", om.Name, err)
45+
}
46+
metrics = append(metrics, &distributed.MetricDump{
47+
Name: om.Name,
48+
Data: data,
49+
})
50+
}
51+
52+
data := &distributed.MetricsDump{
53+
InstanceID: instanceID,
54+
Metrics: metrics,
55+
}
56+
_, err := client.SendMetrics(ctx, data)
57+
if err != nil {
58+
logger.Errorf("There was a problem dumping metrics: %s", err)
59+
}
60+
}
61+
62+
go func() {
63+
defer close(done)
64+
ticker := time.NewTicker(1 * time.Second)
65+
defer ticker.Stop()
66+
67+
for {
68+
select {
69+
case <-ticker.C:
70+
dumpMetrics()
71+
case <-stop:
72+
dumpMetrics()
73+
return
74+
}
75+
}
76+
}()
77+
78+
finalize := func() {
79+
logger.Debug("Final metric dump...")
80+
close(stop)
81+
<-done
82+
logger.Debug("Done!")
83+
}
84+
85+
return finalize
86+
}
87+
}
88+
2189
// TODO: a whole lot of cleanup, refactoring, error handling and hardening
2290
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
2391
c := &cmdsRunAndAgent{gs: gs}
@@ -42,6 +110,8 @@ func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
42110
return nil, nil, err
43111
}
44112

113+
c.metricsEngineHook = getMetricsHook(gs.Ctx, resp.InstanceID, client, gs.Logger)
114+
45115
controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
46116
if err != nil {
47117
return nil, nil, err

cmd/coordinator.go

+70-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
11
package cmd
22

33
import (
4+
"fmt"
45
"net"
6+
"strings"
57

68
"github.com/spf13/cobra"
79
"github.com/spf13/pflag"
810
"go.k6.io/k6/cmd/state"
11+
"go.k6.io/k6/errext"
12+
"go.k6.io/k6/errext/exitcodes"
13+
"go.k6.io/k6/execution"
914
"go.k6.io/k6/execution/distributed"
15+
"go.k6.io/k6/lib"
16+
"go.k6.io/k6/metrics/engine"
1017
"google.golang.org/grpc"
1118
)
1219

@@ -17,19 +24,80 @@ type cmdCoordinator struct {
1724
instanceCount int
1825
}
1926

20-
func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
27+
// TODO: split apart
28+
func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) { //nolint: funlen
29+
ctx, runAbort := execution.NewTestRunContext(c.gs.Ctx, c.gs.Logger)
30+
2131
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
2232
if err != nil {
2333
return err
2434
}
2535

36+
// Only consolidated options, not derived
37+
testRunState, err := test.buildTestRunState(test.consolidatedConfig.Options)
38+
if err != nil {
39+
return err
40+
}
41+
42+
metricsEngine, err := engine.NewMetricsEngine(testRunState.Registry, c.gs.Logger)
43+
if err != nil {
44+
return err
45+
}
46+
2647
coordinator, err := distributed.NewCoordinatorServer(
27-
c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger,
48+
c.instanceCount, test.initRunner.MakeArchive(), metricsEngine, c.gs.Logger,
2849
)
2950
if err != nil {
3051
return err
3152
}
3253

54+
if !testRunState.RuntimeOptions.NoSummary.Bool {
55+
defer func() {
56+
c.gs.Logger.Debug("Generating the end-of-test summary...")
57+
summaryResult, serr := test.initRunner.HandleSummary(ctx, &lib.Summary{
58+
Metrics: metricsEngine.ObservedMetrics,
59+
RootGroup: test.initRunner.GetDefaultGroup(),
60+
TestRunDuration: coordinator.GetCurrentTestRunDuration(),
61+
NoColor: c.gs.Flags.NoColor,
62+
UIState: lib.UIState{
63+
IsStdOutTTY: c.gs.Stdout.IsTTY,
64+
IsStdErrTTY: c.gs.Stderr.IsTTY,
65+
},
66+
})
67+
if serr == nil {
68+
serr = handleSummaryResult(c.gs.FS, c.gs.Stdout, c.gs.Stderr, summaryResult)
69+
}
70+
if serr != nil {
71+
c.gs.Logger.WithError(serr).Error("Failed to handle the end-of-test summary")
72+
}
73+
}()
74+
}
75+
76+
if !testRunState.RuntimeOptions.NoThresholds.Bool {
77+
getCurrentTestDuration := coordinator.GetCurrentTestRunDuration
78+
finalizeThresholds := metricsEngine.StartThresholdCalculations(nil, runAbort, getCurrentTestDuration)
79+
80+
defer func() {
81+
// This gets called after all of the outputs have stopped, so we are
82+
// sure there won't be any more metrics being sent.
83+
c.gs.Logger.Debug("Finalizing thresholds...")
84+
breachedThresholds := finalizeThresholds()
85+
if len(breachedThresholds) > 0 {
86+
tErr := errext.WithAbortReasonIfNone(
87+
errext.WithExitCodeIfNone(
88+
fmt.Errorf("thresholds on metrics '%s' have been breached", strings.Join(breachedThresholds, ", ")),
89+
exitcodes.ThresholdsHaveFailed,
90+
), errext.AbortedByThresholdsAfterTestEnd)
91+
92+
if err == nil {
93+
err = tErr
94+
} else {
95+
c.gs.Logger.WithError(tErr).Debug("Breached thresholds, but test already exited with another error")
96+
}
97+
}
98+
}()
99+
}
100+
33101
c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
34102
listener, err := net.Listen("tcp", c.gRPCAddress)
35103
if err != nil {

cmd/run.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type cmdsRunAndAgent struct {
4343

4444
// TODO: figure out something more elegant?
4545
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
46+
metricsEngineHook func(*engine.MetricsEngine) func()
4647
testEndHook func(err error)
4748
}
4849

@@ -179,9 +180,9 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
179180
}
180181

181182
// We'll need to pipe metrics to the MetricsEngine and process them if any
182-
// of these are enabled: thresholds, end-of-test summary
183+
// of these are enabled: thresholds, end-of-test summary, engine hook
183184
shouldProcessMetrics := (!testRunState.RuntimeOptions.NoSummary.Bool ||
184-
!testRunState.RuntimeOptions.NoThresholds.Bool)
185+
!testRunState.RuntimeOptions.NoThresholds.Bool || c.metricsEngineHook != nil)
185186
var metricsIngester *engine.OutputIngester
186187
if shouldProcessMetrics {
187188
err = metricsEngine.InitSubMetricsAndThresholds(conf.Options, testRunState.RuntimeOptions.NoThresholds.Bool)
@@ -244,6 +245,11 @@ func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
244245
stopOutputs(err)
245246
}()
246247

248+
if c.metricsEngineHook != nil {
249+
hookFinalize := c.metricsEngineHook(metricsEngine)
250+
defer hookFinalize()
251+
}
252+
247253
if !testRunState.RuntimeOptions.NoThresholds.Bool {
248254
finalizeThresholds := metricsEngine.StartThresholdCalculations(
249255
metricsIngester, runAbort, executionState.GetCurrentTestRunDuration,

execution/distributed/coordinator.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/sirupsen/logrus"
1313
"go.k6.io/k6/lib"
14+
"go.k6.io/k6/metrics/engine"
1415
)
1516

1617
// CoordinatorServer coordinates multiple k6 agents.
@@ -21,6 +22,7 @@ type CoordinatorServer struct {
2122
instanceCount int
2223
test *lib.Archive
2324
logger logrus.FieldLogger
25+
metricsEngine *engine.MetricsEngine
2426

2527
testStartTimeLock sync.Mutex
2628
testStartTime *time.Time
@@ -34,7 +36,7 @@ type CoordinatorServer struct {
3436

3537
// NewCoordinatorServer initializes and returns a new CoordinatorServer.
3638
func NewCoordinatorServer(
37-
instanceCount int, test *lib.Archive, logger logrus.FieldLogger,
39+
instanceCount int, test *lib.Archive, metricsEngine *engine.MetricsEngine, logger logrus.FieldLogger,
3840
) (*CoordinatorServer, error) {
3941
segments, err := test.Options.ExecutionSegment.Split(int64(instanceCount))
4042
if err != nil {
@@ -58,6 +60,7 @@ func NewCoordinatorServer(
5860
cs := &CoordinatorServer{
5961
instanceCount: instanceCount,
6062
test: test,
63+
metricsEngine: metricsEngine,
6164
logger: logger,
6265
ess: ess,
6366
cc: newCoordinatorController(instanceCount, logger),
@@ -144,6 +147,18 @@ func (cs *CoordinatorServer) CommandAndControl(stream DistributedTest_CommandAnd
144147
return cs.cc.handleInstanceStream(initInstMsg.InitInstanceID, stream)
145148
}
146149

150+
// SendMetrics accepts and imports the given metrics in the coordinator's MetricsEngine.
151+
func (cs *CoordinatorServer) SendMetrics(_ context.Context, dumpMsg *MetricsDump) (*MetricsDumpResponse, error) {
152+
// TODO: something nicer?
153+
for _, md := range dumpMsg.Metrics {
154+
if err := cs.metricsEngine.ImportMetric(md.Name, md.Data); err != nil {
155+
cs.logger.Errorf("Error merging sink for metric %s: %w", md.Name, err)
156+
// return nil, err
157+
}
158+
}
159+
return &MetricsDumpResponse{}, nil
160+
}
161+
147162
// Wait blocks until all instances have disconnected.
148163
func (cs *CoordinatorServer) Wait() {
149164
cs.wg.Wait()

0 commit comments

Comments
 (0)