Skip to content

Commit 7eab492

Browse files
Merge pull request #116 from skip-mev/nv/prometheus-container-name
fix: provide prometheus container name
2 parents 8056fc5 + 3f626b6 commit 7eab492

File tree

10 files changed

+154
-77
lines changed

10 files changed

+154
-77
lines changed

core/monitoring/prometheus.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro
4040
"--web.console.libraries=/usr/share/prometheus/console_libraries",
4141
"--web.console.templates=/usr/share/prometheus/consoles",
4242
},
43+
ContainerName: "prometheus",
4344
ProviderSpecificConfig: opts.ProviderSpecificConfig,
4445
})
46+
if err != nil {
47+
return nil, err
48+
}
4549

4650
parsedPrometheusConfig, err := parsePrometheusConfig(opts)
4751
if err != nil {

core/provider/digitalocean/digitalocean_provider.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
4141
doClient := godo.NewFromToken(token)
4242

4343
sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair()
44-
4544
if err != nil {
4645
return nil, err
4746
}
@@ -77,11 +76,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
7776
firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs)
7877

7978
if err != nil {
80-
return nil, err
79+
return nil, fmt.Errorf("failed to create firewall: %w", err)
8180
}
8281

8382
digitalOceanProvider.firewallID = firewall.ID
84-
8583
_, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey)
8684

8785
if err != nil {

core/provider/digitalocean/droplet.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
5454

5555
start := time.Now()
5656

57-
err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) {
57+
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*300, func() (bool, error) {
5858
d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID)
5959

6060
if err != nil {
@@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
7474
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip)))
7575

7676
if err != nil {
77+
p.logger.Error("failed to create docker client", zap.Error(err))
7778
return false, err
7879
}
7980

@@ -83,6 +84,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
8384
return false, nil
8485
}
8586

87+
p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name))
8688
return true, nil
8789
})
8890

@@ -117,13 +119,16 @@ func (p *Provider) deleteDroplet(ctx context.Context, name string) error {
117119
return nil
118120
}
119121

120-
func (p *Provider) getDroplet(ctx context.Context, name string) (*godo.Droplet, error) {
122+
func (p *Provider) getDroplet(ctx context.Context, name string, returnOnCacheHit bool) (*godo.Droplet, error) {
121123
cachedDroplet, ok := p.droplets.Load(name)
122-
123124
if !ok {
124125
return nil, fmt.Errorf("could not find droplet %s", name)
125126
}
126127

128+
if ok && returnOnCacheHit {
129+
return cachedDroplet, nil
130+
}
131+
127132
droplet, res, err := p.doClient.Droplets.Get(ctx, cachedDroplet.ID)
128133

129134
if err != nil {

core/provider/digitalocean/task.go

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"io"
8+
"net"
9+
"path"
10+
"time"
11+
712
"github.com/docker/docker/api/types"
813
"github.com/docker/docker/api/types/container"
914
"github.com/docker/docker/api/types/mount"
@@ -15,10 +20,6 @@ import (
1520
"github.com/spf13/afero"
1621
"github.com/spf13/afero/sftpfs"
1722
"go.uber.org/zap"
18-
"io"
19-
"net"
20-
"path"
21-
"time"
2223
)
2324

2425
func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) {
@@ -27,7 +28,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
2728
}
2829

2930
if definition.ProviderSpecificConfig == nil {
30-
return "", fmt.Errorf("digitalocean specific config is nil")
31+
return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name)
3132
}
3233

3334
_, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig)
@@ -122,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error {
122123
return err
123124
}
124125

125-
err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) {
126+
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) {
126127
status, err := p.GetTaskStatus(ctx, taskName)
127128
if err != nil {
128129
return false, err
@@ -171,7 +172,7 @@ func (p *Provider) DestroyTask(ctx context.Context, taskName string) error {
171172
}
172173

173174
func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider.TaskStatus, error) {
174-
droplet, err := p.getDroplet(ctx, taskName)
175+
droplet, err := p.getDroplet(ctx, taskName, false)
175176

176177
if err != nil {
177178
return provider.TASK_STATUS_UNDEFINED, err
@@ -295,7 +296,7 @@ func (p *Provider) DownloadDir(ctx context.Context, s string, s2 string, s3 stri
295296
}
296297

297298
func (p *Provider) GetIP(ctx context.Context, taskName string) (string, error) {
298-
droplet, err := p.getDroplet(ctx, taskName)
299+
droplet, err := p.getDroplet(ctx, taskName, true)
299300

300301
if err != nil {
301302
return "", err
@@ -366,6 +367,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
366367
dockerClient, err := p.getDropletDockerClient(ctx, taskName)
367368

368369
if err != nil {
370+
p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName))
369371
return "", "", 0, err
370372
}
371373

@@ -396,45 +398,76 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
396398
}, nil, nil, definition.ContainerName)
397399

398400
if err != nil {
401+
p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName))
399402
return "", "", 0, err
400403
}
401404

402405
defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true})
403406

404-
err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{})
405-
406-
if err != nil {
407+
if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil {
408+
p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName))
407409
return "", "", 0, err
408410
}
409411

412+
// wait for container start
410413
exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{
411414
AttachStdout: true,
412415
AttachStderr: true,
413416
Cmd: command,
414417
})
415-
416418
if err != nil {
419+
p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName))
417420
return "", "", 0, err
418421
}
419422

420423
resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{})
421424

422425
if err != nil {
426+
p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName))
423427
return "", "", 0, err
424428
}
425429

426430
defer resp.Close()
427431

428432
execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID)
429433
if err != nil {
434+
p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName))
430435
return "", "", 0, err
431436
}
432437

433438
var stdout, stderr bytes.Buffer
434439

435440
_, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader)
436441

437-
return stdout.String(), stderr.String(), execInspect.ExitCode, nil
442+
return stdout.String(), stderr.String(), execInspect.ExitCode, err
443+
}
444+
445+
func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error {
446+
// start container
447+
if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
448+
return err
449+
}
450+
451+
// cancel container after a minute
452+
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
453+
defer cancel()
454+
ticker := time.NewTicker(100 * time.Millisecond)
455+
for {
456+
select {
457+
case <-waitCtx.Done():
458+
return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err())
459+
case <-ticker.C:
460+
container, err := dockerClient.ContainerInspect(ctx, containerID)
461+
if err != nil {
462+
return err
463+
}
464+
465+
// if the container is running, we're done
466+
if container.State.Running {
467+
return nil
468+
}
469+
}
470+
}
438471
}
439472

440473
func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error {

core/provider/digitalocean/types.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package digitalocean
22

33
type DigitalOceanTaskConfig struct {
4-
Region string
5-
Size string
4+
Region string
5+
Size string
66
ImageID int
77
}

core/provider/docker/task.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
9393
listeners.CloseAll()
9494
return "", err
9595
}
96-
96+
9797
// network map is volatile, so we need to mutex update it
9898
p.networkMu.Lock()
9999
p.listeners[createdContainer.ID] = listeners
@@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error {
144144
if status == provider.TASK_RUNNING {
145145
return nil
146146
}
147-
time.Sleep(time.Second)
147+
time.Sleep(time.Millisecond * 100)
148148
}
149149
}
150150

core/provider/task.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi
5757
return task, nil
5858
}
5959

60-
// Start starts the underlying task's workload including its sidecars if startSidecars is set to true
60+
// Start starts the underlying task's workload including its sidecars if startSidecars is set to true.
61+
// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task,
62+
// this is not thread-safe: PLEASE DON'T DO THAT.
6163
func (t *Task) Start(ctx context.Context, startSidecars bool) error {
62-
t.mu.Lock()
63-
defer t.mu.Unlock()
64-
6564
if startSidecars {
6665
for _, sidecar := range t.Sidecars {
6766
err := sidecar.Start(ctx, startSidecars)
@@ -161,18 +160,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err
161160
func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) {
162161
status, err := t.Provider.GetTaskStatus(ctx, t.ID)
163162
if err != nil {
163+
t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition))
164164
return "", "", 0, err
165165
}
166166

167167
if status == TASK_RUNNING {
168168
t.mu.Lock()
169169
defer t.mu.Unlock()
170+
t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running"))
170171
return t.Provider.RunCommand(ctx, t.ID, command)
171172
}
172173

173174
t.mu.Lock()
174175
defer t.mu.Unlock()
175176

177+
t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running"))
176178
return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command)
177179
}
178180

core/types/chain.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ type ChainConfig struct {
7474

7575
NodeCreator NodeCreator // NodeCreator is a function that creates a node
7676
NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition
77-
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
77+
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
7878
// if not set.
7979
GenesisDelegation *big.Int
8080
// number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set.

0 commit comments

Comments
 (0)