Skip to content

Commit 2443ac6

Browse files
committed
Add support for native distributed execution
This adds the `k6 agent` and `k6 coordinator` sub-commands and adds a very simple way to do distributed execution, including packaging and sending the script to agents, and setup() and teardown() handling. However, it doesn't include automatic metric handling (e.g. thresholds and the end-of-test summary).
1 parent 19a30e2 commit 2443ac6

10 files changed

+1529
-6
lines changed

cmd/agent.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package cmd
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
7+
"github.com/spf13/afero"
8+
"github.com/spf13/cobra"
9+
"go.k6.io/k6/cmd/state"
10+
"go.k6.io/k6/execution"
11+
"go.k6.io/k6/execution/distributed"
12+
"go.k6.io/k6/js"
13+
"go.k6.io/k6/lib"
14+
"go.k6.io/k6/loader"
15+
"go.k6.io/k6/metrics"
16+
"google.golang.org/grpc"
17+
"google.golang.org/grpc/credentials/insecure"
18+
"gopkg.in/guregu/null.v3"
19+
)
20+
21+
// TODO: a whole lot of cleanup, refactoring, error handling and hardening
22+
func getCmdAgent(gs *state.GlobalState) *cobra.Command { //nolint: funlen
23+
c := &cmdsRunAndAgent{gs: gs}
24+
25+
c.loadConfiguredTest = func(cmd *cobra.Command, args []string) (
26+
*loadedAndConfiguredTest, execution.Controller, error,
27+
) {
28+
// TODO: add some gRPC authentication
29+
conn, err := grpc.Dial(args[0], grpc.WithTransportCredentials(insecure.NewCredentials()))
30+
if err != nil {
31+
return nil, nil, err
32+
}
33+
c.testEndHook = func(err error) {
34+
gs.Logger.Debugf("k6 agent run ended with err=%s", err)
35+
_ = conn.Close()
36+
}
37+
38+
client := distributed.NewDistributedTestClient(conn)
39+
40+
resp, err := client.Register(gs.Ctx, &distributed.RegisterRequest{})
41+
if err != nil {
42+
return nil, nil, err
43+
}
44+
45+
controller, err := distributed.NewAgentController(gs.Ctx, resp.InstanceID, client, gs.Logger)
46+
if err != nil {
47+
return nil, nil, err
48+
}
49+
50+
var options lib.Options
51+
if err = json.Unmarshal(resp.Options, &options); err != nil {
52+
return nil, nil, err
53+
}
54+
55+
arc, err := lib.ReadArchive(bytes.NewReader(resp.Archive))
56+
if err != nil {
57+
return nil, nil, err
58+
}
59+
60+
registry := metrics.NewRegistry()
61+
piState := &lib.TestPreInitState{
62+
Logger: gs.Logger,
63+
RuntimeOptions: lib.RuntimeOptions{
64+
NoThresholds: null.BoolFrom(true),
65+
NoSummary: null.BoolFrom(true),
66+
Env: arc.Env,
67+
CompatibilityMode: null.StringFrom(arc.CompatibilityMode),
68+
},
69+
Registry: registry,
70+
BuiltinMetrics: metrics.RegisterBuiltinMetrics(registry),
71+
}
72+
73+
initRunner, err := js.NewFromArchive(piState, arc)
74+
if err != nil {
75+
return nil, nil, err
76+
}
77+
78+
test := &loadedTest{
79+
pwd: arc.Pwd,
80+
sourceRootPath: arc.Filename,
81+
source: &loader.SourceData{
82+
Data: resp.Archive,
83+
URL: arc.FilenameURL,
84+
},
85+
fs: afero.NewMemMapFs(), // TODO: figure out what should be here
86+
fileSystems: arc.Filesystems,
87+
preInitState: piState,
88+
initRunner: initRunner,
89+
}
90+
91+
pseudoConsoldatedConfig := applyDefault(Config{Options: options})
92+
for _, thresholds := range pseudoConsoldatedConfig.Thresholds {
93+
if err = thresholds.Parse(); err != nil {
94+
return nil, nil, err
95+
}
96+
}
97+
derivedConfig, err := deriveAndValidateConfig(pseudoConsoldatedConfig, initRunner.IsExecutable, gs.Logger)
98+
if err != nil {
99+
return nil, nil, err
100+
}
101+
102+
configuredTest := &loadedAndConfiguredTest{
103+
loadedTest: test,
104+
consolidatedConfig: pseudoConsoldatedConfig,
105+
derivedConfig: derivedConfig,
106+
}
107+
108+
gs.Flags.Address = "" // TODO: fix, this is a hack so agents don't start an API server
109+
110+
return configuredTest, controller, nil // TODO
111+
}
112+
113+
agentCmd := &cobra.Command{
114+
Use: "agent",
115+
Short: "Join a distributed load test",
116+
Long: `TODO`,
117+
Args: exactArgsWithMsg(1, "arg should either the IP and port of the controller k6 instance"),
118+
RunE: c.run,
119+
Hidden: true, // TODO: remove when officially released
120+
}
121+
122+
// TODO: add flags
123+
124+
return agentCmd
125+
}

cmd/coordinator.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package cmd
2+
3+
import (
4+
"net"
5+
6+
"github.com/spf13/cobra"
7+
"github.com/spf13/pflag"
8+
"go.k6.io/k6/cmd/state"
9+
"go.k6.io/k6/execution/distributed"
10+
"google.golang.org/grpc"
11+
)
12+
13+
// cmdCoordinator handles the `k6 coordinator` sub-command
14+
type cmdCoordinator struct {
15+
gs *state.GlobalState
16+
gRPCAddress string
17+
instanceCount int
18+
}
19+
20+
func (c *cmdCoordinator) run(cmd *cobra.Command, args []string) (err error) {
21+
test, err := loadAndConfigureLocalTest(c.gs, cmd, args, getPartialConfig)
22+
if err != nil {
23+
return err
24+
}
25+
26+
coordinator, err := distributed.NewCoordinatorServer(
27+
c.instanceCount, test.initRunner.MakeArchive(), c.gs.Logger,
28+
)
29+
if err != nil {
30+
return err
31+
}
32+
33+
c.gs.Logger.Infof("Starting gRPC server on %s", c.gRPCAddress)
34+
listener, err := net.Listen("tcp", c.gRPCAddress)
35+
if err != nil {
36+
return err
37+
}
38+
39+
grpcServer := grpc.NewServer() // TODO: add auth and a whole bunch of other options
40+
distributed.RegisterDistributedTestServer(grpcServer, coordinator)
41+
42+
go func() {
43+
err := grpcServer.Serve(listener)
44+
c.gs.Logger.Debugf("gRPC server end: %s", err)
45+
}()
46+
coordinator.Wait()
47+
c.gs.Logger.Infof("All done!")
48+
return nil
49+
}
50+
51+
func (c *cmdCoordinator) flagSet() *pflag.FlagSet {
52+
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
53+
flags.SortFlags = false
54+
flags.AddFlagSet(optionFlagSet())
55+
flags.AddFlagSet(runtimeOptionFlagSet(false))
56+
57+
// TODO: add support bi-directional gRPC authentication and authorization
58+
flags.StringVar(&c.gRPCAddress, "grpc-addr", "localhost:6566", "address on which to bind the gRPC server")
59+
60+
// TODO: add some better way to specify the test, e.g. an execution segment
61+
// sequence + some sort of a way to map instances with specific segments
62+
// (e.g. key-value tags that can be matched to every execution segment, with
63+
// each instance advertising its own tags when it connects).
64+
flags.IntVar(&c.instanceCount, "instance-count", 1, "number of distributed instances")
65+
return flags
66+
}
67+
68+
func getCmdCoordnator(gs *state.GlobalState) *cobra.Command {
69+
c := &cmdCoordinator{
70+
gs: gs,
71+
}
72+
73+
coordinatorCmd := &cobra.Command{
74+
Use: "coordinator",
75+
Short: "Start a distributed load test",
76+
Long: `TODO`,
77+
RunE: c.run,
78+
Hidden: true, // TODO: remove when officially released
79+
}
80+
81+
coordinatorCmd.Flags().SortFlags = false
82+
coordinatorCmd.Flags().AddFlagSet(c.flagSet())
83+
84+
return coordinatorCmd
85+
}

cmd/root.go

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func newRootCommand(gs *state.GlobalState) *rootCommand {
6464
getCmdArchive, getCmdCloud, getCmdNewScript, getCmdInspect,
6565
getCmdLogin, getCmdPause, getCmdResume, getCmdScale, getCmdRun,
6666
getCmdStats, getCmdStatus, getCmdVersion,
67+
getCmdAgent, getCmdCoordnator,
6768
}
6869

6970
for _, sc := range subCommands {

cmd/run.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ import (
3737
"go.k6.io/k6/ui/pb"
3838
)
3939

40-
// cmdRun handles the `k6 run` sub-command
41-
type cmdRun struct {
40+
// cmdsRunAndAgent handles the `k6 run` and `k6 agent` sub-commands
41+
type cmdsRunAndAgent struct {
4242
gs *state.GlobalState
4343

4444
// TODO: figure out something more elegant?
4545
loadConfiguredTest func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error)
46+
testEndHook func(err error)
4647
}
4748

4849
const (
@@ -60,14 +61,17 @@ const (
6061
// TODO: split apart some more
6162
//
6263
//nolint:funlen,gocognit,gocyclo,cyclop
63-
func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
64+
func (c *cmdsRunAndAgent) run(cmd *cobra.Command, args []string) (err error) {
6465
var logger logrus.FieldLogger = c.gs.Logger
6566
defer func() {
6667
if err == nil {
6768
logger.Debug("Everything has finished, exiting k6 normally!")
6869
} else {
6970
logger.WithError(err).Debug("Everything has finished, exiting k6 with an error!")
7071
}
72+
if c.testEndHook != nil {
73+
c.testEndHook(err)
74+
}
7175
}()
7276
printBanner(c.gs)
7377

@@ -435,7 +439,7 @@ func (c *cmdRun) run(cmd *cobra.Command, args []string) (err error) {
435439
return nil
436440
}
437441

438-
func (c *cmdRun) flagSet() *pflag.FlagSet {
442+
func (c *cmdsRunAndAgent) flagSet() *pflag.FlagSet {
439443
flags := pflag.NewFlagSet("", pflag.ContinueOnError)
440444
flags.SortFlags = false
441445
flags.AddFlagSet(optionFlagSet())
@@ -444,7 +448,7 @@ func (c *cmdRun) flagSet() *pflag.FlagSet {
444448
return flags
445449
}
446450

447-
func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
451+
func (c *cmdsRunAndAgent) setupTracerProvider(ctx context.Context, test *loadedAndConfiguredTest) error {
448452
ro := test.preInitState.RuntimeOptions
449453
if ro.TracesOutput.String == "none" {
450454
test.preInitState.TracerProvider = trace.NewNoopTracerProvider()
@@ -461,7 +465,7 @@ func (c *cmdRun) setupTracerProvider(ctx context.Context, test *loadedAndConfigu
461465
}
462466

463467
func getCmdRun(gs *state.GlobalState) *cobra.Command {
464-
c := &cmdRun{
468+
c := &cmdsRunAndAgent{
465469
gs: gs,
466470
loadConfiguredTest: func(cmd *cobra.Command, args []string) (*loadedAndConfiguredTest, execution.Controller, error) {
467471
test, err := loadAndConfigureLocalTest(gs, cmd, args, getConfig)

0 commit comments

Comments
 (0)