Skip to content

Commit 2e444a7

Browse files
authored
Move SystemManager & span handling to root command (#2026)
Moving the creation & clean up of the SystemManager and root span avoids having the same code copied between all the commands. This also gives the opportunity to give the root spans a more coherent name rather than the code location. This also fixes an issue with the wasm command which didn't apply the root commands PersistentPreRun. Fixes #2023
1 parent 67dd0ee commit 2e444a7

17 files changed

+81
-152
lines changed

cmd/bacalhau/cancel.go

-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ import (
44
"fmt"
55

66
"github.com/filecoin-project/bacalhau/pkg/bacerrors"
7-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
8-
9-
"github.com/filecoin-project/bacalhau/pkg/system"
107
"github.com/filecoin-project/bacalhau/pkg/util/templates"
118
"github.com/spf13/cobra"
129
"k8s.io/kubectl/pkg/util/i18n"
@@ -52,14 +49,8 @@ func newCancelCmd() *cobra.Command {
5249
}
5350

5451
func cancel(cmd *cobra.Command, cmdArgs []string, options *CancelOptions) error {
55-
cm := system.NewCleanupManager()
56-
defer cm.Cleanup()
5752
ctx := cmd.Context()
5853

59-
ctx, span := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.cancel")
60-
defer span.End()
61-
cm.RegisterCallback(telemetry.Cleanup)
62-
6354
var err error
6455

6556
requestedJobID := cmdArgs[0]

cmd/bacalhau/create.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,8 @@ import (
88
"strings"
99
"time"
1010

11-
"github.com/filecoin-project/bacalhau/pkg/downloader/util"
12-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
13-
1411
"github.com/filecoin-project/bacalhau/pkg/bacerrors"
12+
"github.com/filecoin-project/bacalhau/pkg/downloader/util"
1513
jobutils "github.com/filecoin-project/bacalhau/pkg/job"
1614
"github.com/filecoin-project/bacalhau/pkg/model"
1715
"github.com/filecoin-project/bacalhau/pkg/system"
@@ -83,13 +81,9 @@ func newCreateCmd() *cobra.Command {
8381
}
8482

8583
func create(cmd *cobra.Command, cmdArgs []string, OC *CreateOptions) error { //nolint:funlen,gocyclo
86-
cm := system.NewCleanupManager()
87-
defer cm.Cleanup()
8884
ctx := cmd.Context()
8985

90-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.create")
91-
defer rootSpan.End()
92-
cm.RegisterCallback(telemetry.Cleanup)
86+
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
9387

9488
// Custom unmarshaller
9589
// https://stackoverflow.com/questions/70635636/unmarshaling-yaml-into-different-struct-based-off-yaml-field?rq=1

cmd/bacalhau/describe.go

-8
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"fmt"
66

77
"github.com/filecoin-project/bacalhau/pkg/bacerrors"
8-
"github.com/filecoin-project/bacalhau/pkg/system"
9-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
108
"github.com/filecoin-project/bacalhau/pkg/util/templates"
119
"github.com/spf13/cobra"
1210
"k8s.io/kubectl/pkg/util/i18n"
@@ -78,18 +76,12 @@ func newDescribeCmd() *cobra.Command {
7876
}
7977

8078
func describe(cmd *cobra.Command, cmdArgs []string, OD *DescribeOptions) error {
81-
cm := system.NewCleanupManager()
82-
defer cm.Cleanup()
8379
ctx := cmd.Context()
8480

8581
if err := cmd.ParseFlags(cmdArgs[1:]); err != nil {
8682
Fatal(cmd, fmt.Sprintf("Failed to parse flags: %v\n", err), 1)
8783
}
8884

89-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.describe")
90-
defer rootSpan.End()
91-
cm.RegisterCallback(telemetry.Cleanup)
92-
9385
var err error
9486
inputJobID := cmdArgs[0]
9587
if inputJobID == "" {

cmd/bacalhau/devstack.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,9 @@ func newDevStackCmd() *cobra.Command {
127127
}
128128

129129
func runDevstack(cmd *cobra.Command, ODs *devstack.DevStackOptions, OS *ServeOptions, IsNoop bool) error {
130-
cm := system.NewCleanupManager()
131-
defer cm.Cleanup()
132130
ctx := cmd.Context()
133131

134-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.runDevstack")
135-
defer rootSpan.End()
132+
cm := ctx.Value(systemManagerKey).(*system.CleanupManager)
136133

137134
if config.DevstackShouldWriteEnvFile() {
138135
cm.RegisterCallback(cleanupDevstackDotEnv)

cmd/bacalhau/docker_run.go

+4-18
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@ import (
99
jobutils "github.com/filecoin-project/bacalhau/pkg/job"
1010
"github.com/filecoin-project/bacalhau/pkg/model"
1111
"github.com/filecoin-project/bacalhau/pkg/system"
12-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
1312
"github.com/filecoin-project/bacalhau/pkg/util/templates"
14-
"github.com/filecoin-project/bacalhau/pkg/version"
1513
"github.com/pkg/errors"
1614
"github.com/spf13/cobra"
1715
"k8s.io/kubectl/pkg/util/i18n"
@@ -123,17 +121,9 @@ func NewDockerRunOptions() *DockerRunOptions {
123121

124122
func newDockerCmd() *cobra.Command {
125123
dockerCmd := &cobra.Command{
126-
Use: "docker",
127-
Short: "Run a docker job on the network (see run subcommand)",
128-
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
129-
// Check that the server version is compatible with the client version
130-
serverVersion, _ := GetAPIClient().Version(cmd.Context()) // Ok if this fails, version validation will skip
131-
if err := ensureValidVersion(cmd.Context(), version.Get(), serverVersion); err != nil {
132-
cmd.Println(err.Error())
133-
return err
134-
}
135-
return nil
136-
},
124+
Use: "docker",
125+
Short: "Run a docker job on the network (see run subcommand)",
126+
PersistentPreRunE: checkVersion,
137127
}
138128

139129
dockerCmd.AddCommand(newDockerRunCmd())
@@ -280,13 +270,9 @@ func newDockerRunCmd() *cobra.Command { //nolint:funlen
280270
}
281271

282272
func dockerRun(cmd *cobra.Command, cmdArgs []string, ODR *DockerRunOptions) error {
283-
cm := system.NewCleanupManager()
284-
defer cm.Cleanup()
285273
ctx := cmd.Context()
286274

287-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.dockerRun")
288-
defer rootSpan.End()
289-
cm.RegisterCallback(telemetry.Cleanup)
275+
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
290276

291277
j, err := CreateJob(cmdArgs, ODR)
292278
if err != nil {

cmd/bacalhau/get.go

+1-7
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55

66
"github.com/filecoin-project/bacalhau/pkg/downloader/util"
77
"github.com/filecoin-project/bacalhau/pkg/model"
8-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
9-
108
"github.com/filecoin-project/bacalhau/pkg/system"
119
"github.com/filecoin-project/bacalhau/pkg/util/templates"
1210
"github.com/pkg/errors"
@@ -60,13 +58,9 @@ func newGetCmd() *cobra.Command {
6058
}
6159

6260
func get(cmd *cobra.Command, cmdArgs []string, OG *GetOptions) error {
63-
cm := system.NewCleanupManager()
64-
defer cm.Cleanup()
6561
ctx := cmd.Context()
6662

67-
ctx, span := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.get")
68-
defer span.End()
69-
cm.RegisterCallback(telemetry.Cleanup)
63+
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
7064

7165
var err error
7266

cmd/bacalhau/id.go

-7
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"os"
66

77
"github.com/filecoin-project/bacalhau/pkg/libp2p"
8-
"github.com/filecoin-project/bacalhau/pkg/system"
9-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
108
"github.com/spf13/cobra"
119
)
1210

@@ -31,11 +29,6 @@ func newIDCmd() *cobra.Command {
3129
}
3230

3331
func id(_ *cobra.Command, OS *ServeOptions) error {
34-
// Cleanup manager ensures that resources are freed before exiting:
35-
cm := system.NewCleanupManager()
36-
cm.RegisterCallback(telemetry.Cleanup)
37-
defer cm.Cleanup()
38-
3932
libp2pHost, err := libp2p.NewHost(OS.SwarmPort)
4033
if err != nil {
4134
return err

cmd/bacalhau/list.go

-7
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/filecoin-project/bacalhau/pkg/job"
99
"github.com/filecoin-project/bacalhau/pkg/model"
1010
"github.com/filecoin-project/bacalhau/pkg/system"
11-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
1211
"github.com/filecoin-project/bacalhau/pkg/util/templates"
1312
"github.com/jedib0t/go-pretty/v6/table"
1413
"github.com/rs/zerolog/log"
@@ -148,14 +147,8 @@ func (c *ColumnEnum) Set(v string) error {
148147
}
149148

150149
func list(cmd *cobra.Command, OL *ListOptions) error {
151-
cm := system.NewCleanupManager()
152-
defer cm.Cleanup()
153150
ctx := cmd.Context()
154151

155-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.list")
156-
defer rootSpan.End()
157-
cm.RegisterCallback(telemetry.Cleanup)
158-
159152
log.Ctx(ctx).Debug().Msgf("Table filter flag set to: %s", OL.IDFilter)
160153
log.Ctx(ctx).Debug().Msgf("Table limit flag set to: %d", OL.MaxJobs)
161154
log.Ctx(ctx).Debug().Msgf("Table output format flag set to: %s", OL.OutputFormat)

cmd/bacalhau/root.go

+51
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,20 @@ package bacalhau
22

33
import (
44
"context"
5+
"fmt"
56
"os"
67
"strconv"
78
"strings"
89

910
"github.com/filecoin-project/bacalhau/pkg/config"
1011
"github.com/filecoin-project/bacalhau/pkg/logger"
1112
"github.com/filecoin-project/bacalhau/pkg/system"
13+
"github.com/filecoin-project/bacalhau/pkg/telemetry"
14+
"github.com/filecoin-project/bacalhau/pkg/version"
1215
"github.com/rs/zerolog/log"
1316
"github.com/spf13/cobra"
1417
"github.com/spf13/viper"
18+
"go.opentelemetry.io/otel/trace"
1519
)
1620

1721
var apiHost string
@@ -54,7 +58,29 @@ func NewRootCmd() *cobra.Command {
5458
Short: "Compute over data",
5559
Long: `Compute over data`,
5660
PersistentPreRun: func(cmd *cobra.Command, args []string) {
61+
ctx := cmd.Context()
62+
5763
logger.ConfigureLogging(loggingMode)
64+
65+
cm := system.NewCleanupManager()
66+
cm.RegisterCallback(telemetry.Cleanup)
67+
ctx = context.WithValue(ctx, systemManagerKey, cm)
68+
69+
var names []string
70+
root := cmd
71+
for ; root.HasParent(); root = root.Parent() {
72+
names = append([]string{root.Name()}, names...)
73+
}
74+
name := fmt.Sprintf("bacalhau.%s", strings.Join(names, "."))
75+
ctx, span := system.NewRootSpan(ctx, system.GetTracer(), name)
76+
ctx = context.WithValue(ctx, spanKey, span)
77+
78+
cmd.SetContext(ctx)
79+
},
80+
PersistentPostRun: func(cmd *cobra.Command, args []string) {
81+
ctx := cmd.Context()
82+
ctx.Value(spanKey).(trace.Span).End()
83+
ctx.Value(systemManagerKey).(*system.CleanupManager).Cleanup()
5884
},
5985
}
6086
// ====== Start a job
@@ -162,3 +188,28 @@ func Execute() {
162188
Fatal(RootCmd, err.Error(), 1)
163189
}
164190
}
191+
192+
type contextKey struct {
193+
name string
194+
}
195+
196+
var systemManagerKey = contextKey{name: "context key for storing the system manager"}
197+
var spanKey = contextKey{name: "context key for storing the root span"}
198+
199+
func checkVersion(cmd *cobra.Command, args []string) error {
200+
// corba doesn't do PersistentPreRun{,E} chaining yet
201+
// https://github.com/spf13/cobra/issues/252
202+
root := cmd
203+
for ; root.HasParent(); root = root.Parent() {
204+
}
205+
root.PersistentPreRun(cmd, args)
206+
207+
// Check that the server version is compatible with the client version
208+
serverVersion, _ := GetAPIClient().Version(cmd.Context()) // Ok if this fails, version validation will skip
209+
if err := ensureValidVersion(cmd.Context(), version.Get(), serverVersion); err != nil {
210+
Fatal(cmd, fmt.Sprintf("version validation failed: %s", err), 1)
211+
return err
212+
}
213+
214+
return nil
215+
}

cmd/bacalhau/run.go

+4-16
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,15 @@
11
package bacalhau
22

33
import (
4-
"fmt"
5-
6-
"github.com/filecoin-project/bacalhau/pkg/version"
74
"github.com/spf13/cobra"
85
)
96

107
func newRunCmd() *cobra.Command {
118
runCmd := &cobra.Command{
12-
Use: "run",
13-
Short: "Run a job on the network (see subcommands for supported flavors)",
14-
PreRun: applyPorcelainLogLevel,
15-
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
16-
// Check that the server version is compatible with the client version
17-
serverVersion, _ := GetAPIClient().Version(cmd.Context()) // Ok if this fails, version validation will skip
18-
if err := ensureValidVersion(cmd.Context(), version.Get(), serverVersion); err != nil {
19-
Fatal(cmd, fmt.Sprintf("version validation failed: %s", err), 1)
20-
return err
21-
}
22-
23-
return nil
24-
},
9+
Use: "run",
10+
Short: "Run a job on the network (see subcommands for supported flavors)",
11+
PreRun: applyPorcelainLogLevel,
12+
PersistentPreRunE: checkVersion,
2513
}
2614
runCmd.AddCommand(newRunPythonCmd())
2715
return runCmd

cmd/bacalhau/run_python.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ import (
55
"time"
66

77
"github.com/filecoin-project/bacalhau/pkg/downloader/util"
8-
"github.com/filecoin-project/bacalhau/pkg/model"
9-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
10-
118
"github.com/filecoin-project/bacalhau/pkg/job"
9+
"github.com/filecoin-project/bacalhau/pkg/model"
1210
"github.com/filecoin-project/bacalhau/pkg/storage/inline"
1311
"github.com/filecoin-project/bacalhau/pkg/system"
1412
"github.com/filecoin-project/bacalhau/pkg/util/templates"
@@ -165,13 +163,9 @@ func newRunPythonCmd() *cobra.Command {
165163
}
166164

167165
func runPython(cmd *cobra.Command, cmdArgs []string, OLR *LanguageRunOptions) error {
168-
cm := system.NewCleanupManager()
169-
defer cm.Cleanup()
170166
ctx := cmd.Context()
171167

172-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.runPython")
173-
defer rootSpan.End()
174-
cm.RegisterCallback(telemetry.Cleanup)
168+
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
175169

176170
// error if determinism is false
177171
if !OLR.Deterministic {

cmd/bacalhau/serve.go

+4-11
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,14 @@ import (
1010
"time"
1111

1212
"github.com/filecoin-project/bacalhau/pkg/compute/capacity"
13+
"github.com/filecoin-project/bacalhau/pkg/ipfs"
1314
"github.com/filecoin-project/bacalhau/pkg/jobstore/inmemory"
1415
"github.com/filecoin-project/bacalhau/pkg/libp2p"
1516
"github.com/filecoin-project/bacalhau/pkg/libp2p/rcmgr"
1617
"github.com/filecoin-project/bacalhau/pkg/logger"
17-
filecoinlotus "github.com/filecoin-project/bacalhau/pkg/publisher/filecoin_lotus"
18-
"github.com/filecoin-project/bacalhau/pkg/telemetry"
19-
20-
"github.com/filecoin-project/bacalhau/pkg/ipfs"
2118
"github.com/filecoin-project/bacalhau/pkg/model"
2219
"github.com/filecoin-project/bacalhau/pkg/node"
20+
filecoinlotus "github.com/filecoin-project/bacalhau/pkg/publisher/filecoin_lotus"
2321
"github.com/filecoin-project/bacalhau/pkg/system"
2422
"github.com/filecoin-project/bacalhau/pkg/util/templates"
2523
"github.com/multiformats/go-multiaddr"
@@ -304,18 +302,13 @@ func newServeCmd() *cobra.Command {
304302

305303
//nolint:funlen,gocyclo
306304
func serve(cmd *cobra.Command, OS *ServeOptions) error {
307-
// Cleanup manager ensures that resources are freed before exiting:
308-
cm := system.NewCleanupManager()
309-
cm.RegisterCallback(telemetry.Cleanup)
310-
defer cm.Cleanup()
305+
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
311306

307+
// TODO this should be for all commands
312308
// Context ensures main goroutine waits until killed with ctrl+c:
313309
ctx, cancel := signal.NotifyContext(cmd.Context(), ShutdownSignals...)
314310
defer cancel()
315311

316-
ctx, rootSpan := system.NewRootSpan(ctx, system.GetTracer(), "cmd/bacalhau.serve")
317-
defer rootSpan.End()
318-
319312
isComputeNode, isRequesterNode := false, false
320313
for _, nodeType := range OS.NodeType {
321314
if nodeType == "compute" {

0 commit comments

Comments
 (0)