Skip to content

Commit fcf4adb

Browse files
committed
Add support for passing a context when cleaning up
This adds a second type of clean up function - one that takes a `context.Context`. This makes it easier to avoid problems of using a cancelled context when cleaning up, as the logic doesn't need to be spread about, and makes it easy to migrate a portion of log statements that aren't using `log.Ctx` to do so. Part of #2001
1 parent 2e444a7 commit fcf4adb

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+158
-238
lines changed

cmd/bacalhau/base_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,6 @@ func (s *BaseSuite) SetupTest() {
5252
func (s *BaseSuite) TearDownTest() {
5353
Fatal = FatalErrorHandler
5454
if s.node != nil {
55-
s.node.CleanupManager.Cleanup()
55+
s.node.CleanupManager.Cleanup(context.Background())
5656
}
5757
}

cmd/bacalhau/root.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func NewRootCmd() *cobra.Command {
8080
PersistentPostRun: func(cmd *cobra.Command, args []string) {
8181
ctx := cmd.Context()
8282
ctx.Value(spanKey).(trace.Span).End()
83-
ctx.Value(systemManagerKey).(*system.CleanupManager).Cleanup()
83+
ctx.Value(systemManagerKey).(*system.CleanupManager).Cleanup(cmd.Context())
8484
},
8585
}
8686
// ====== Start a job
@@ -154,12 +154,12 @@ func Execute() {
154154

155155
err := viper.BindEnv("API_HOST")
156156
if err != nil {
157-
log.Fatal().Msgf("API_HOST was set, but could not bind.")
157+
log.Ctx(RootCmd.Context()).Fatal().Msgf("API_HOST was set, but could not bind.")
158158
}
159159

160160
err = viper.BindEnv("API_PORT")
161161
if err != nil {
162-
log.Fatal().Msgf("API_PORT was set, but could not bind.")
162+
log.Ctx(RootCmd.Context()).Fatal().Msgf("API_PORT was set, but could not bind.")
163163
}
164164

165165
viper.AutomaticEnv()
@@ -174,7 +174,7 @@ func Execute() {
174174
var parseErr error
175175
apiPort, parseErr = strconv.Atoi(envAPIPort.(string))
176176
if parseErr != nil {
177-
log.Fatal().Msgf("could not parse API_PORT into an int. %s", envAPIPort)
177+
log.Ctx(RootCmd.Context()).Fatal().Msgf("could not parse API_PORT into an int. %s", envAPIPort)
178178
}
179179
}
180180

cmd/bacalhau/serve.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ func ipfsClient(ctx context.Context, OS *ServeOptions, cm *system.CleanupManager
481481
if err != nil {
482482
return ipfs.Client{}, fmt.Errorf("error creating IPFS node: %s", err)
483483
}
484-
cm.RegisterCallback(ipfsNode.Close)
484+
cm.RegisterCallbackWithContext(ipfsNode.Close)
485485
client := ipfsNode.Client()
486486

487487
swarmAddresses, err := client.SwarmAddresses(ctx)

cmd/bacalhau/serve_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ func (s *ServeSuite) SetupTest() {
4444
s.Require().NoError(system.InitConfigForTesting(s.T()))
4545

4646
cm := system.NewCleanupManager()
47-
s.T().Cleanup(cm.Cleanup)
47+
s.T().Cleanup(func() {
48+
cm.Cleanup(context.Background())
49+
})
4850

4951
node, err := ipfs.NewLocalNode(context.Background(), cm, []string{})
5052
s.NoError(err)
@@ -69,7 +71,7 @@ func (s *ServeSuite) writeToServeChannel(rootCmd *cobra.Command, port int) {
6971

7072
rootCmd.SetArgs(args)
7173

72-
log.Trace().Msgf("Command to execute: %v", rootCmd.CalledAs())
74+
log.Ctx(rootCmd.Context()).Trace().Msgf("Command to execute: %v", rootCmd.CalledAs())
7375

7476
_, err := rootCmd.ExecuteC()
7577
s.Require().NoError(err)

dashboard/api/cmd/dashboard/serve.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func serve(cmd *cobra.Command, options *ServeOptions) error {
8787
// Cleanup manager ensures that resources are freed before exiting:
8888
cm := system.NewCleanupManager()
8989
cm.RegisterCallback(telemetry.Cleanup)
90-
defer cm.Cleanup()
90+
defer cm.Cleanup(cmd.Context())
9191
ctx := cmd.Context()
9292

9393
if options.ServerOptions.JWTSecret == "" {
@@ -121,7 +121,7 @@ func serve(cmd *cobra.Command, options *ServeOptions) error {
121121
if err != nil {
122122
return err
123123
}
124-
cm.RegisterCallback(model.Stop)
124+
cm.RegisterCallbackWithContext(model.Stop)
125125

126126
// Start transport layer
127127
err = libp2p.ConnectToPeersContinuously(ctx, cm, libp2pHost, peers)

dashboard/api/pkg/model/api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ func (api *ModelAPI) Start(ctx context.Context) error {
169169
return nil
170170
}
171171

172-
func (api *ModelAPI) Stop() error {
172+
func (api *ModelAPI) Stop(ctx context.Context) error {
173173
if api.cleanupFunc != nil {
174-
api.cleanupFunc(context.Background())
174+
api.cleanupFunc(ctx)
175175
}
176176
return nil
177177
}

dashboard/api/pkg/server/utils.go

-15
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,6 @@ func GetRequestBody[T any](w http.ResponseWriter, r *http.Request) (*T, error) {
3434
return &requestBody, nil
3535
}
3636

37-
func HTTPGet[T any](url string) (T, error) {
38-
var data T
39-
//nolint:gosec
40-
resp, err := http.Get(url)
41-
if err != nil {
42-
return data, err
43-
}
44-
err = json.NewDecoder(resp.Body).Decode(&data)
45-
if err != nil {
46-
return data, err
47-
}
48-
resp.Body.Close()
49-
return data, nil
50-
}
51-
5237
func generateJWT(
5338
secret string,
5439
username string,

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ require (
9494
golang.org/x/crypto v0.5.0
9595
golang.org/x/exp v0.0.0-20221106115401-f9659909a136
9696
golang.org/x/mod v0.7.0
97-
golang.org/x/net v0.7.0
9897
k8s.io/apimachinery v0.26.1
9998
k8s.io/kubectl v0.26.1
10099
modernc.org/sqlite v1.20.2
@@ -333,6 +332,7 @@ require (
333332
go.uber.org/dig v1.14.1 // indirect
334333
go.uber.org/fx v1.17.1 // indirect
335334
go4.org v0.0.0-20201209231011-d4a079459e60 // indirect
335+
golang.org/x/net v0.7.0 // indirect
336336
golang.org/x/oauth2 v0.1.0 // indirect
337337
golang.org/x/sync v0.1.0 // indirect
338338
golang.org/x/sys v0.5.0 // indirect

pkg/config/config.go

+3-21
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,12 @@ type contextKey int
5555

5656
const (
5757
getVolumeSizeRequestTimeoutKey contextKey = iota
58-
downloadCidRequestTimeoutKey
5958
)
6059

6160
const (
6261
// by default we wait 2 minutes for the IPFS network to resolve a CID
6362
// tests will override this using config.SetVolumeSizeRequestTimeout(2)
64-
getVolumeSizeRequestTimeout time.Duration = 2 * time.Minute
65-
66-
// by default we wait 5 minutes for the IPFS network to download a CID
67-
// tests will override this using config.SetVolumeSizeRequestTimeout(2)
68-
downloadCidRequestTimeout time.Duration = 5 * time.Minute
63+
getVolumeSizeRequestTimeout = 2 * time.Minute
6964
)
7065

7166
// how long do we wait for a volume size request to timeout
@@ -87,19 +82,6 @@ func SetVolumeSizeRequestTimeout(ctx context.Context, value time.Duration) conte
8782
return context.WithValue(ctx, getVolumeSizeRequestTimeoutKey, value)
8883
}
8984

90-
// how long do we wait for a cid to download
91-
func GetDownloadCidRequestTimeout(ctx context.Context) time.Duration {
92-
value := ctx.Value(downloadCidRequestTimeoutKey)
93-
if value == nil {
94-
value = downloadCidRequestTimeout
95-
}
96-
return value.(time.Duration)
97-
}
98-
99-
func SetDownloadCidRequestTimeout(ctx context.Context, value time.Duration) context.Context {
100-
return context.WithValue(ctx, downloadCidRequestTimeoutKey, value)
101-
}
102-
10385
// by default we wait 5 minutes for a URL to download
10486
// tests will override this using config.SetDownloadURLRequestTimeoutSeconds(2)
10587
var downloadURLRequestTimeoutSeconds int64 = 300
@@ -135,7 +117,7 @@ func GetConfigPath() string {
135117
// e.g. /home/francesca/.bacalhau
136118
dirname, err := os.UserHomeDir()
137119
if err != nil {
138-
log.Fatal().Err(err)
120+
log.Fatal().Err(err).Send()
139121
}
140122
d = filepath.Join(dirname, suffix)
141123
} else {
@@ -144,7 +126,7 @@ func GetConfigPath() string {
144126
}
145127
// create dir if not exists
146128
if err := os.MkdirAll(d, util.OS_USER_RWX); err != nil {
147-
log.Fatal().Err(err)
129+
log.Fatal().Err(err).Send()
148130
}
149131
return d
150132
}

pkg/devstack/devstack.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func NewDevStack(
131131
return nil, err
132132
}
133133

134-
cm.RegisterCallback(lotus.Close)
134+
cm.RegisterCallbackWithContext(lotus.Close)
135135

136136
if err := lotus.start(ctx); err != nil { //nolint:govet
137137
return nil, err
@@ -323,9 +323,9 @@ func NewDevStack(
323323
}
324324

325325
// only start profiling after we've set everything up!
326-
profiler := StartProfiling(options.CPUProfilingFile, options.MemoryProfilingFile)
326+
profiler := StartProfiling(ctx, options.CPUProfilingFile, options.MemoryProfilingFile)
327327
if profiler != nil {
328-
cm.RegisterCallback(profiler.Close)
328+
cm.RegisterCallbackWithContext(profiler.Close)
329329
}
330330

331331
return &DevStack{

pkg/devstack/devstack_ipfs.go

-17
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,3 @@ func NewDevStackIPFS(ctx context.Context, cm *system.CleanupManager, count int)
4747

4848
return stack, nil
4949
}
50-
51-
func (stack *DevStackIPFS) PrintNodeInfo() {
52-
logString := `
53-
-------------------------------
54-
ipfs
55-
-------------------------------
56-
57-
command="add -q testdata/grep_file.txt"
58-
`
59-
for _, node := range stack.IPFSClients {
60-
logString += fmt.Sprintf(`
61-
cid=$(ipfs --api %s ipfs $command)
62-
curl -XPOST %s`, node.APIAddress(), node.APIAddress())
63-
}
64-
65-
log.Trace().Msg(logString + "\n")
66-
}

pkg/devstack/lotus.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package devstack
22

33
import (
44
"archive/tar"
5+
"context"
56
"fmt"
67
"io"
78
"os"
@@ -18,7 +19,6 @@ import (
1819
"github.com/filecoin-project/bacalhau/pkg/util/closer"
1920
"github.com/hashicorp/go-multierror"
2021
"github.com/rs/zerolog/log"
21-
"golang.org/x/net/context"
2222
)
2323

2424
const defaultImage = "ghcr.io/bacalhau-project/lotus-filecoin-image:v0.0.2"
@@ -111,7 +111,7 @@ func (l *LotusNode) start(ctx context.Context) error {
111111
}
112112

113113
if err := l.waitForLotusToBeHealthy(ctx); err != nil {
114-
if err := l.Close(); err != nil { //nolint:govet
114+
if err := l.Close(ctx); err != nil { //nolint:govet
115115
log.Ctx(ctx).Err(err).Msgf(`Problem occurred when giving up waiting for Lotus to become healthy`)
116116
}
117117
return err
@@ -196,12 +196,12 @@ ListenAddress = "/ip4/0.0.0.0/tcp/%s/http"
196196
return nil
197197
}
198198

199-
func (l *LotusNode) Close() error {
199+
func (l *LotusNode) Close(ctx context.Context) error {
200200
var errs error
201201

202202
defer closer.CloseWithLogOnError("Docker client", l.client)
203203
if l.container != "" {
204-
if err := l.client.RemoveContainer(context.Background(), l.container); err != nil {
204+
if err := l.client.RemoveContainer(ctx, l.container); err != nil {
205205
errs = multierror.Append(errs, err)
206206
}
207207
}

pkg/devstack/profiler.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package devstack
22

33
import (
4-
"io"
4+
"context"
55
"os"
66
"runtime"
77
"runtime/pprof"
@@ -15,31 +15,31 @@ type profiler struct {
1515
memoryFile string
1616
}
1717

18-
func StartProfiling(cpuFile, memoryFile string) io.Closer {
18+
func StartProfiling(ctx context.Context, cpuFile, memoryFile string) CloserWithContext {
1919
// do a GC before we start profiling
2020
runtime.GC()
2121

22-
log.Trace().Msg("============= STARTING PROFILING ============")
22+
log.Ctx(ctx).Trace().Msg("============= STARTING PROFILING ============")
2323

2424
var f *os.File
2525
if cpuFile != "" {
2626
var err error
2727
f, err = os.Create(cpuFile)
2828
if err != nil {
29-
log.Debug().Err(err).Str("Path", cpuFile).Msg("could not create CPU profile")
29+
log.Ctx(ctx).Debug().Err(err).Str("Path", cpuFile).Msg("could not create CPU profile")
3030
return nil
3131
}
3232
if err := pprof.StartCPUProfile(f); err != nil {
3333
closer.CloseWithLogOnError(cpuFile, f)
34-
log.Debug().Err(err).Msg("could not start CPU profile")
34+
log.Ctx(ctx).Debug().Err(err).Msg("could not start CPU profile")
3535
return nil
3636
}
3737
}
3838

3939
return &profiler{cpuFile: f, memoryFile: memoryFile}
4040
}
4141

42-
func (p *profiler) Close() error {
42+
func (p *profiler) Close(ctx context.Context) error {
4343
// stop profiling now, just before we clean up, if we're profiling.
4444
log.Trace().Msg("============= STOPPING PROFILING ============")
4545
if p.cpuFile != nil {
@@ -50,18 +50,23 @@ func (p *profiler) Close() error {
5050
if p.memoryFile != "" {
5151
f, err := os.Create(p.memoryFile)
5252
if err != nil {
53-
log.Debug().Err(err).Str("Path", p.memoryFile).Msg("could not create memory profile")
53+
log.Ctx(ctx).Debug().Err(err).Str("Path", p.memoryFile).Msg("could not create memory profile")
5454
return nil
5555
}
5656
defer closer.CloseWithLogOnError(p.memoryFile, f) // error handling omitted for example
5757

5858
runtime.GC() // get up-to-date statistics
5959
if err := pprof.WriteHeapProfile(f); err != nil {
60-
log.Debug().Err(err).Msg("could not write memory profile")
60+
log.Ctx(ctx).Debug().Err(err).Msg("could not write memory profile")
6161
}
6262
}
6363

6464
return nil
6565
}
6666

67-
var _ io.Closer = (*profiler)(nil)
67+
var _ CloserWithContext = (*profiler)(nil)
68+
69+
type CloserWithContext interface {
70+
// Close closes the resource.
71+
Close(context.Context) error
72+
}

pkg/downloader/download_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ func (ds *DownloaderSuite) SetupSuite() {
4141
// Before each test
4242
func (ds *DownloaderSuite) SetupTest() {
4343
ds.cm = system.NewCleanupManager()
44-
ds.T().Cleanup(ds.cm.Cleanup)
44+
ds.T().Cleanup(func() {
45+
ds.cm.Cleanup(context.Background())
46+
})
4547

4648
ctx, cancel := context.WithCancel(context.Background())
4749
ds.T().Cleanup(cancel)

pkg/downloader/ipfs/downloader.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (ipfsDownloader *Downloader) FetchResult(ctx context.Context, result model.
4040
return err
4141
}
4242
defer func() {
43-
if closeErr := n.Close(); closeErr != nil {
43+
if closeErr := n.Close(ctx); closeErr != nil {
4444
log.Ctx(ctx).Error().Err(closeErr).Msg("Failed to close IPFS node")
4545
}
4646
}()

0 commit comments

Comments
 (0)