Skip to content

Commit d76d648

Browse files
committed
Use context for more logging statements
Use `log.Ctx` for more logging statements. Also move the `signal.NotifyContext` calls to the root command so all commands can cleanly tidy up. Part #2001
1 parent 66e545b commit d76d648

File tree

12 files changed

+54
-73
lines changed

12 files changed

+54
-73
lines changed

cmd/bacalhau/create.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func newCreateCmd() *cobra.Command {
8383
func create(cmd *cobra.Command, cmdArgs []string, OC *CreateOptions) error { //nolint:funlen,gocyclo
8484
ctx := cmd.Context()
8585

86-
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
86+
cm := ctx.Value(systemManagerKey).(*system.CleanupManager)
8787

8888
// Custom unmarshaller
8989
// https://stackoverflow.com/questions/70635636/unmarshaling-yaml-into-different-struct-based-off-yaml-field?rq=1

cmd/bacalhau/devstack.go

-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package bacalhau
33
import (
44
"fmt"
55
"os"
6-
"os/signal"
76
"path/filepath"
87
"strconv"
98

@@ -149,10 +148,6 @@ func runDevstack(cmd *cobra.Command, ODs *devstack.DevStackOptions, OS *ServeOpt
149148
ODs.NumberOfBadRequesterActors, totalRequesterNodes), 1)
150149
}
151150

152-
// Context ensures main goroutine waits until killed with ctrl+c:
153-
ctx, cancel := signal.NotifyContext(ctx, ShutdownSignals...)
154-
defer cancel()
155-
156151
portFileName := filepath.Join(os.TempDir(), "bacalhau-devstack.port")
157152
pidFileName := filepath.Join(os.TempDir(), "bacalhau-devstack.pid")
158153

cmd/bacalhau/docker_run.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package bacalhau
22

33
import (
4+
"context"
45
"fmt"
56
"strings"
67

@@ -272,9 +273,9 @@ func newDockerRunCmd() *cobra.Command { //nolint:funlen
272273
func dockerRun(cmd *cobra.Command, cmdArgs []string, ODR *DockerRunOptions) error {
273274
ctx := cmd.Context()
274275

275-
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
276+
cm := ctx.Value(systemManagerKey).(*system.CleanupManager)
276277

277-
j, err := CreateJob(cmdArgs, ODR)
278+
j, err := CreateJob(ctx, cmdArgs, ODR)
278279
if err != nil {
279280
Fatal(cmd, fmt.Sprintf("Error creating job: %s", err), 1)
280281
return nil
@@ -320,7 +321,7 @@ func dockerRun(cmd *cobra.Command, cmdArgs []string, ODR *DockerRunOptions) erro
320321
}
321322

322323
// CreateJob creates a job object from the given command line arguments and options.
323-
func CreateJob(cmdArgs []string, odr *DockerRunOptions) (*model.Job, error) {
324+
func CreateJob(ctx context.Context, cmdArgs []string, odr *DockerRunOptions) (*model.Job, error) {
324325
odr.Image = cmdArgs[0]
325326
odr.Entrypoint = cmdArgs[1:]
326327

@@ -370,6 +371,7 @@ func CreateJob(cmdArgs []string, odr *DockerRunOptions) (*model.Job, error) {
370371
}
371372

372373
j, err := jobutils.ConstructDockerJob(
374+
ctx,
373375
model.APIVersionLatest(),
374376
engineType,
375377
verifierType,

cmd/bacalhau/root.go

+25-23
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"os"
7+
"os/signal"
78
"strconv"
89
"strings"
910

@@ -80,7 +81,7 @@ func NewRootCmd() *cobra.Command {
8081
PersistentPostRun: func(cmd *cobra.Command, args []string) {
8182
ctx := cmd.Context()
8283
ctx.Value(spanKey).(trace.Span).End()
83-
ctx.Value(systemManagerKey).(*system.CleanupManager).Cleanup(cmd.Context())
84+
ctx.Value(systemManagerKey).(*system.CleanupManager).Cleanup(ctx)
8485
},
8586
}
8687
// ====== Start a job
@@ -138,9 +139,12 @@ Ignored if BACALHAU_API_PORT environment variable is set.`,
138139
}
139140

140141
func Execute() {
141-
RootCmd := NewRootCmd()
142-
// ANCHOR: Set global context here
143-
RootCmd.SetContext(context.Background())
142+
rootCmd := NewRootCmd()
143+
144+
// Ensure commands are able to stop cleanly if someone presses ctrl+c
145+
ctx, cancel := signal.NotifyContext(context.Background(), ShutdownSignals...)
146+
defer cancel()
147+
rootCmd.SetContext(ctx)
144148

145149
doNotTrack = false
146150
if doNotTrackValue, foundDoNotTrack := os.LookupEnv("DO_NOT_TRACK"); foundDoNotTrack {
@@ -152,40 +156,36 @@ func Execute() {
152156

153157
viper.SetEnvPrefix("BACALHAU")
154158

155-
err := viper.BindEnv("API_HOST")
156-
if err != nil {
157-
log.Ctx(RootCmd.Context()).Fatal().Msgf("API_HOST was set, but could not bind.")
159+
if err := viper.BindEnv("API_HOST"); err != nil {
160+
log.Ctx(ctx).Fatal().Msgf("API_HOST was set, but could not bind.")
158161
}
159162

160-
err = viper.BindEnv("API_PORT")
161-
if err != nil {
162-
log.Ctx(RootCmd.Context()).Fatal().Msgf("API_PORT was set, but could not bind.")
163+
if err := viper.BindEnv("API_PORT"); err != nil {
164+
log.Ctx(ctx).Fatal().Msgf("API_PORT was set, but could not bind.")
163165
}
164166

165167
viper.AutomaticEnv()
166-
envAPIHost := viper.Get("API_HOST")
167-
envAPIPort := viper.Get("API_PORT")
168168

169-
if envAPIHost != nil && envAPIHost != "" {
170-
apiHost = envAPIHost.(string)
169+
if envAPIHost := viper.GetString("API_HOST"); envAPIHost != "" {
170+
apiHost = envAPIHost
171171
}
172172

173-
if envAPIPort != nil && envAPIPort != "" {
173+
if envAPIPort := viper.GetString("API_PORT"); envAPIPort != "" {
174174
var parseErr error
175-
apiPort, parseErr = strconv.Atoi(envAPIPort.(string))
175+
apiPort, parseErr = strconv.Atoi(envAPIPort)
176176
if parseErr != nil {
177-
log.Ctx(RootCmd.Context()).Fatal().Msgf("could not parse API_PORT into an int. %s", envAPIPort)
177+
log.Ctx(ctx).Fatal().Msgf("could not parse API_PORT into an int. %s", envAPIPort)
178178
}
179179
}
180180

181181
// Use stdout, not stderr for cmd.Print output, so that
182182
// e.g. ID=$(bacalhau run) works
183-
RootCmd.SetOut(system.Stdout)
183+
rootCmd.SetOut(system.Stdout)
184184
// TODO this is from fixing a deprecation warning for SetOutput. Shouldn't this be system.Stderr?
185-
RootCmd.SetErr(system.Stdout)
185+
rootCmd.SetErr(system.Stdout)
186186

187-
if err := RootCmd.Execute(); err != nil {
188-
Fatal(RootCmd, err.Error(), 1)
187+
if err := rootCmd.Execute(); err != nil {
188+
Fatal(rootCmd, err.Error(), 1)
189189
}
190190
}
191191

@@ -197,6 +197,8 @@ var systemManagerKey = contextKey{name: "context key for storing the system mana
197197
var spanKey = contextKey{name: "context key for storing the root span"}
198198

199199
func checkVersion(cmd *cobra.Command, args []string) error {
200+
ctx := cmd.Context()
201+
200202
// corba doesn't do PersistentPreRun{,E} chaining yet
201203
// https://github.com/spf13/cobra/issues/252
202204
root := cmd
@@ -205,8 +207,8 @@ func checkVersion(cmd *cobra.Command, args []string) error {
205207
root.PersistentPreRun(cmd, args)
206208

207209
// Check that the server version is compatible with the client version
208-
serverVersion, _ := GetAPIClient().Version(cmd.Context()) // Ok if this fails, version validation will skip
209-
if err := ensureValidVersion(cmd.Context(), version.Get(), serverVersion); err != nil {
210+
serverVersion, _ := GetAPIClient().Version(ctx) // Ok if this fails, version validation will skip
211+
if err := ensureValidVersion(ctx, version.Get(), serverVersion); err != nil {
210212
Fatal(cmd, fmt.Sprintf("version validation failed: %s", err), 1)
211213
return err
212214
}

cmd/bacalhau/run_python.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func NewLanguageRunOptions() *LanguageRunOptions {
7878
// TODO: move the adapter code (from wasm to docker) into a wasm executor, so
7979
// that the compute node can verify the job knowing that it was run properly,
8080
// rather than doing the translation in, and thereby trusting, the client (to
81-
// set up the wasm environment to be determinstic)
81+
// set up the wasm environment to be deterministic)
8282

8383
func newRunPythonCmd() *cobra.Command {
8484
OLR := NewLanguageRunOptions()
@@ -165,7 +165,7 @@ func newRunPythonCmd() *cobra.Command {
165165
func runPython(cmd *cobra.Command, cmdArgs []string, OLR *LanguageRunOptions) error {
166166
ctx := cmd.Context()
167167

168-
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
168+
cm := ctx.Value(systemManagerKey).(*system.CleanupManager)
169169

170170
// error if determinism is false
171171
if !OLR.Deterministic {
@@ -193,10 +193,10 @@ func runPython(cmd *cobra.Command, cmdArgs []string, OLR *LanguageRunOptions) er
193193
// have ConstructLanguageJob and ConstructDockerJob as separate means
194194
// manually keeping them in sync.
195195
j, err := job.ConstructLanguageJob(
196+
ctx,
196197
OLR.InputVolumes,
197198
OLR.InputUrls,
198199
OLR.OutputVolumes,
199-
[]string{}, // no env vars (yet)
200200
OLR.Concurrency,
201201
OLR.Confidence,
202202
OLR.MinBids,
@@ -206,7 +206,6 @@ func runPython(cmd *cobra.Command, cmdArgs []string, OLR *LanguageRunOptions) er
206206
OLR.Command,
207207
programPath,
208208
OLR.RequirementsPath,
209-
OLR.ContextPath,
210209
OLR.Deterministic,
211210
OLR.Labels,
212211
doNotTrack,

cmd/bacalhau/serve.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"os"
7-
"os/signal"
87
"sort"
98
"strings"
109
"time"
@@ -302,12 +301,8 @@ func newServeCmd() *cobra.Command {
302301

303302
//nolint:funlen,gocyclo
304303
func serve(cmd *cobra.Command, OS *ServeOptions) error {
305-
cm := cmd.Context().Value(systemManagerKey).(*system.CleanupManager)
306-
307-
// TODO this should be for all commands
308-
// Context ensures main goroutine waits until killed with ctrl+c:
309-
ctx, cancel := signal.NotifyContext(cmd.Context(), ShutdownSignals...)
310-
defer cancel()
304+
ctx := cmd.Context()
305+
cm := ctx.Value(systemManagerKey).(*system.CleanupManager)
311306

312307
isComputeNode, isRequesterNode := false, false
313308
for _, nodeType := range OS.NodeType {

cmd/bacalhau/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func (oV *VersionOptions) Run(ctx context.Context, cmd *cobra.Command) error {
105105
if !oV.ClientOnly {
106106
serverVersion, err := GetAPIClient().Version(ctx)
107107
if err != nil {
108-
log.Ctx(cmd.Context()).Error().Err(err).Msgf("could not get server version")
108+
log.Ctx(ctx).Error().Err(err).Msgf("could not get server version")
109109
return err
110110
}
111111

cmd/bacalhau/wasm_run.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func runWasm(
191191
time.Sleep(1 * time.Second)
192192

193193
storage := inline.NewStorage()
194-
inlineData, err := storage.Upload(cmd.Context(), info.Name())
194+
inlineData, err := storage.Upload(ctx, info.Name())
195195
if err != nil {
196196
return err
197197
}

pkg/devstack/profiler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func StartProfiling(ctx context.Context, cpuFile, memoryFile string) CloserWithC
4141

4242
func (p *profiler) Close(ctx context.Context) error {
4343
// stop profiling now, just before we clean up, if we're profiling.
44-
log.Trace().Msg("============= STOPPING PROFILING ============")
44+
log.Ctx(ctx).Trace().Msg("============= STOPPING PROFILING ============")
4545
if p.cpuFile != nil {
4646
pprof.StopCPUProfile()
4747
closer.CloseWithLogOnError(p.cpuFile.Name(), p.cpuFile)

pkg/job/factory.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package job
22

33
import (
4+
"context"
45
"strings"
56

67
"github.com/filecoin-project/bacalhau/pkg/model"
@@ -12,6 +13,7 @@ import (
1213
// to pass in the collection of CLI args as strings
1314
// and have a Job struct returned
1415
func ConstructDockerJob( //nolint:funlen
16+
ctx context.Context,
1517
a model.APIVersion,
1618
e model.Engine,
1719
v model.Verifier,
@@ -48,7 +50,7 @@ func ConstructDockerJob( //nolint:funlen
4850
if err != nil {
4951
return &model.Job{}, err
5052
}
51-
jobOutputs, err := buildJobOutputs(outputVolumes)
53+
jobOutputs, err := buildJobOutputs(ctx, outputVolumes)
5254
if err != nil {
5355
return &model.Job{}, err
5456
}
@@ -64,7 +66,7 @@ func ConstructDockerJob( //nolint:funlen
6466
}
6567

6668
if len(unSafeAnnotations) > 0 {
67-
log.Error().Msgf("The following labels are unsafe. Labels must fit the regex '/%s/' (and all emjois): %+v",
69+
log.Ctx(ctx).Error().Msgf("The following labels are unsafe. Labels must fit the regex '/%s/' (and all emjois): %+v",
6870
RegexString,
6971
strings.Join(unSafeAnnotations, ", "))
7072
}
@@ -77,7 +79,6 @@ func ConstructDockerJob( //nolint:funlen
7779
if len(workingDir) > 0 {
7880
err = system.ValidateWorkingDir(workingDir)
7981
if err != nil {
80-
log.Error().Msg(err.Error())
8182
return &model.Job{}, err
8283
}
8384
}
@@ -136,10 +137,10 @@ func ConstructDockerJob( //nolint:funlen
136137
}
137138

138139
func ConstructLanguageJob(
140+
ctx context.Context,
139141
inputVolumes []string,
140142
inputUrls []string,
141143
outputVolumes []string,
142-
env []string,
143144
concurrency int,
144145
confidence int,
145146
minBids int,
@@ -150,7 +151,6 @@ func ConstructLanguageJob(
150151
command string,
151152
programPath string,
152153
requirementsPath string,
153-
contextPath string, // we have to tar this up and POST it to the Requester node
154154
deterministic bool,
155155
annotations []string,
156156
doNotTrack bool,
@@ -162,7 +162,7 @@ func ConstructLanguageJob(
162162
if err != nil {
163163
return &model.Job{}, err
164164
}
165-
jobOutputs, err := buildJobOutputs(outputVolumes)
165+
jobOutputs, err := buildJobOutputs(ctx, outputVolumes)
166166
if err != nil {
167167
return &model.Job{}, err
168168
}
@@ -178,7 +178,7 @@ func ConstructLanguageJob(
178178
}
179179

180180
if len(unSafeAnnotations) > 0 {
181-
log.Error().Msgf("The following labels are unsafe. Labels must fit the regex '/%s/' (and all emjois): %+v",
181+
log.Ctx(ctx).Error().Msgf("The following labels are unsafe. Labels must fit the regex '/%s/' (and all emjois): %+v",
182182
RegexString,
183183
strings.Join(unSafeAnnotations, ", "))
184184
}

pkg/job/factory_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package job
44

55
import (
6+
"context"
67
"strings"
78
"testing"
89

@@ -68,6 +69,7 @@ func (suite *JobFactorySuite) TestRun_DockerJobOutputs() {
6869
}
6970

7071
j, err := ConstructDockerJob( //nolint:funlen
72+
context.Background(),
7173
model.APIVersionLatest(),
7274
model.EngineNoop,
7375
model.VerifierNoop,

0 commit comments

Comments
 (0)