Skip to content

Commit

Permalink
Merge pull request #117 from skip-mev/nv/start-droplet-with-block
Browse files Browse the repository at this point in the history
feat: fixes to digital ocean droplet creation, logging, timeout finetuning
  • Loading branch information
Eric-Warehime authored Nov 5, 2024
2 parents 4b75ab1 + 8dd3c81 commit 3f626b6
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 78 deletions.
2 changes: 1 addition & 1 deletion core/monitoring/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro
"--web.console.libraries=/usr/share/prometheus/console_libraries",
"--web.console.templates=/usr/share/prometheus/consoles",
},
ContainerName: "prometheus",
ContainerName: "prometheus",
ProviderSpecificConfig: opts.ProviderSpecificConfig,
})
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions core/provider/digitalocean/digitalocean_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
doClient := godo.NewFromToken(token)

sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,11 +76,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create firewall: %w", err)
}

digitalOceanProvider.firewallID = firewall.ID

_, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey)

if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions core/provider/digitalocean/droplet.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe

start := time.Now()

err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*300, func() (bool, error) {
d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID)

if err != nil {
Expand All @@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip)))

if err != nil {
p.logger.Error("failed to create docker client", zap.Error(err))
return false, err
}

Expand All @@ -83,6 +84,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe
return false, nil
}

p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name))
return true, nil
})

Expand Down Expand Up @@ -117,13 +119,16 @@ func (p *Provider) deleteDroplet(ctx context.Context, name string) error {
return nil
}

func (p *Provider) getDroplet(ctx context.Context, name string) (*godo.Droplet, error) {
func (p *Provider) getDroplet(ctx context.Context, name string, returnOnCacheHit bool) (*godo.Droplet, error) {
cachedDroplet, ok := p.droplets.Load(name)

if !ok {
return nil, fmt.Errorf("could not find droplet %s", name)
}

if ok && returnOnCacheHit {
return cachedDroplet, nil
}

droplet, res, err := p.doClient.Droplets.Get(ctx, cachedDroplet.ID)

if err != nil {
Expand Down
59 changes: 46 additions & 13 deletions core/provider/digitalocean/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"bytes"
"context"
"fmt"
"io"
"net"
"path"
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
Expand All @@ -15,10 +20,6 @@ import (
"github.com/spf13/afero"
"github.com/spf13/afero/sftpfs"
"go.uber.org/zap"
"io"
"net"
"path"
"time"
)

func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) {
Expand All @@ -27,7 +28,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
}

if definition.ProviderSpecificConfig == nil {
return "", fmt.Errorf("digitalocean specific config is nil")
return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name)
}

_, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig)
Expand Down Expand Up @@ -122,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error {
return err
}

err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) {
err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) {
status, err := p.GetTaskStatus(ctx, taskName)
if err != nil {
return false, err
Expand Down Expand Up @@ -171,7 +172,7 @@ func (p *Provider) DestroyTask(ctx context.Context, taskName string) error {
}

func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider.TaskStatus, error) {
droplet, err := p.getDroplet(ctx, taskName)
droplet, err := p.getDroplet(ctx, taskName, false)

if err != nil {
return provider.TASK_STATUS_UNDEFINED, err
Expand Down Expand Up @@ -295,7 +296,7 @@ func (p *Provider) DownloadDir(ctx context.Context, s string, s2 string, s3 stri
}

func (p *Provider) GetIP(ctx context.Context, taskName string) (string, error) {
droplet, err := p.getDroplet(ctx, taskName)
droplet, err := p.getDroplet(ctx, taskName, true)

if err != nil {
return "", err
Expand Down Expand Up @@ -366,6 +367,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
dockerClient, err := p.getDropletDockerClient(ctx, taskName)

if err != nil {
p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

Expand Down Expand Up @@ -396,45 +398,76 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string,
}, nil, nil, definition.ContainerName)

if err != nil {
p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true})

err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{})

if err != nil {
if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil {
p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

// wait for container start
exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{
AttachStdout: true,
AttachStderr: true,
Cmd: command,
})

if err != nil {
p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{})

if err != nil {
p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

defer resp.Close()

execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID)
if err != nil {
p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName))
return "", "", 0, err
}

var stdout, stderr bytes.Buffer

_, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader)

return stdout.String(), stderr.String(), execInspect.ExitCode, nil
return stdout.String(), stderr.String(), execInspect.ExitCode, err
}

func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error {
// start container
if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return err
}

// cancel container after a minute
waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-waitCtx.Done():
return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err())
case <-ticker.C:
container, err := dockerClient.ContainerInspect(ctx, containerID)
if err != nil {
return err
}

// if the container is running, we're done
if container.State.Running {
return nil
}
}
}
}

func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error {
Expand Down
4 changes: 2 additions & 2 deletions core/provider/digitalocean/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package digitalocean

type DigitalOceanTaskConfig struct {
Region string
Size string
Region string
Size string
ImageID int
}
4 changes: 2 additions & 2 deletions core/provider/docker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio
listeners.CloseAll()
return "", err
}

// network map is volatile, so we need to mutex update it
p.networkMu.Lock()
p.listeners[createdContainer.ID] = listeners
Expand Down Expand Up @@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error {
if status == provider.TASK_RUNNING {
return nil
}
time.Sleep(time.Second)
time.Sleep(time.Millisecond * 100)
}
}

Expand Down
10 changes: 6 additions & 4 deletions core/provider/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi
return task, nil
}

// Start starts the underlying task's workload including its sidecars if startSidecars is set to true
// Start starts the underlying task's workload including its sidecars if startSidecars is set to true.
// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task,
// this is not thread-safe: PLEASE DON'T DO THAT.
func (t *Task) Start(ctx context.Context, startSidecars bool) error {
t.mu.Lock()
defer t.mu.Unlock()

if startSidecars {
for _, sidecar := range t.Sidecars {
err := sidecar.Start(ctx, startSidecars)
Expand Down Expand Up @@ -161,18 +160,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err
func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) {
status, err := t.Provider.GetTaskStatus(ctx, t.ID)
if err != nil {
t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition))
return "", "", 0, err
}

if status == TASK_RUNNING {
t.mu.Lock()
defer t.mu.Unlock()
t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running"))
return t.Provider.RunCommand(ctx, t.ID, command)
}

t.mu.Lock()
defer t.mu.Unlock()

t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running"))
return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command)
}

Expand Down
2 changes: 1 addition & 1 deletion core/types/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type ChainConfig struct {

NodeCreator NodeCreator // NodeCreator is a function that creates a node
NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set.
// if not set.
GenesisDelegation *big.Int
// number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set.
Expand Down
Loading

0 comments on commit 3f626b6

Please sign in to comment.