diff --git a/core/monitoring/prometheus.go b/core/monitoring/prometheus.go index 71f1a52..809b5ec 100644 --- a/core/monitoring/prometheus.go +++ b/core/monitoring/prometheus.go @@ -40,7 +40,7 @@ func SetupPrometheusTask(ctx context.Context, logger *zap.Logger, p provider.Pro "--web.console.libraries=/usr/share/prometheus/console_libraries", "--web.console.templates=/usr/share/prometheus/consoles", }, - ContainerName: "prometheus", + ContainerName: "prometheus", ProviderSpecificConfig: opts.ProviderSpecificConfig, }) if err != nil { diff --git a/core/provider/digitalocean/digitalocean_provider.go b/core/provider/digitalocean/digitalocean_provider.go index 912938f..f06687e 100644 --- a/core/provider/digitalocean/digitalocean_provider.go +++ b/core/provider/digitalocean/digitalocean_provider.go @@ -41,7 +41,6 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa doClient := godo.NewFromToken(token) sshPubKey, sshPrivKey, sshFingerprint, err := makeSSHKeyPair() - if err != nil { return nil, err } @@ -77,11 +76,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create firewall: %w", err) } digitalOceanProvider.firewallID = firewall.ID - _, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey) if err != nil { diff --git a/core/provider/digitalocean/droplet.go b/core/provider/digitalocean/droplet.go index 1fa94bf..b4d54df 100644 --- a/core/provider/digitalocean/droplet.go +++ b/core/provider/digitalocean/droplet.go @@ -54,7 +54,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe start := time.Now() - err = util.WaitForCondition(ctx, time.Second*600, time.Second*2, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*300, func() (bool, error) { d, _, err := p.doClient.Droplets.Get(ctx, droplet.ID) if err != nil { @@ -74,6 +74,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe dockerClient, err := dockerclient.NewClientWithOpts(dockerclient.WithHost(fmt.Sprintf("tcp://%s:2375", ip))) if err != nil { + p.logger.Error("failed to create docker client", zap.Error(err)) return false, err } @@ -83,6 +84,7 @@ func (p *Provider) CreateDroplet(ctx context.Context, definition provider.TaskDe return false, nil } + p.logger.Info("droplet is active", zap.Duration("after", time.Since(start)), zap.String("task", definition.Name)) return true, nil }) @@ -117,13 +119,16 @@ func (p *Provider) deleteDroplet(ctx context.Context, name string) error { return nil } -func (p *Provider) getDroplet(ctx context.Context, name string) (*godo.Droplet, error) { +func (p *Provider) getDroplet(ctx context.Context, name string, returnOnCacheHit bool) (*godo.Droplet, error) { cachedDroplet, ok := p.droplets.Load(name) - if !ok { return nil, fmt.Errorf("could not find droplet %s", name) } + if ok && returnOnCacheHit { + return cachedDroplet, nil + } + droplet, res, err := p.doClient.Droplets.Get(ctx, cachedDroplet.ID) if err != nil { diff --git a/core/provider/digitalocean/task.go b/core/provider/digitalocean/task.go index 42b8777..826cdcf 100644 --- a/core/provider/digitalocean/task.go +++ b/core/provider/digitalocean/task.go @@ -4,6 +4,11 @@ import ( "bytes" "context" "fmt" + "io" + "net" + "path" + "time" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/mount" @@ -15,10 +20,6 @@ import ( "github.com/spf13/afero" "github.com/spf13/afero/sftpfs" "go.uber.org/zap" - "io" - "net" - "path" - "time" ) func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definition provider.TaskDefinition) (string, error) { @@ -27,7 +28,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio } if definition.ProviderSpecificConfig == nil { - return "", fmt.Errorf("digitalocean specific config is nil") + return "", fmt.Errorf("digitalocean specific config is nil for %s", definition.Name) } _, ok := definition.ProviderSpecificConfig.(DigitalOceanTaskConfig) @@ -122,7 +123,7 @@ func (p *Provider) StartTask(ctx context.Context, taskName string) error { return err } - err = util.WaitForCondition(ctx, time.Second*300, time.Second*2, func() (bool, error) { + err = util.WaitForCondition(ctx, time.Second*300, time.Millisecond*100, func() (bool, error) { status, err := p.GetTaskStatus(ctx, taskName) if err != nil { return false, err @@ -171,7 +172,7 @@ func (p *Provider) DestroyTask(ctx context.Context, taskName string) error { } func (p *Provider) GetTaskStatus(ctx context.Context, taskName string) (provider.TaskStatus, error) { - droplet, err := p.getDroplet(ctx, taskName) + droplet, err := p.getDroplet(ctx, taskName, false) if err != nil { return provider.TASK_STATUS_UNDEFINED, err @@ -295,7 +296,7 @@ func (p *Provider) DownloadDir(ctx context.Context, s string, s2 string, s3 stri } func (p *Provider) GetIP(ctx context.Context, taskName string) (string, error) { - droplet, err := p.getDroplet(ctx, taskName) + droplet, err := p.getDroplet(ctx, taskName, true) if err != nil { return "", err @@ -366,6 +367,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, dockerClient, err := p.getDropletDockerClient(ctx, taskName) if err != nil { + p.logger.Error("failed to get docker client", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -396,30 +398,32 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, }, nil, nil, definition.ContainerName) if err != nil { + p.logger.Error("failed to create container", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } defer dockerClient.ContainerRemove(ctx, createdContainer.ID, types.ContainerRemoveOptions{Force: true}) - err = dockerClient.ContainerStart(ctx, createdContainer.ID, types.ContainerStartOptions{}) - - if err != nil { + if err := startContainerWithBlock(ctx, dockerClient, createdContainer.ID); err != nil { + p.logger.Error("failed to start container", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } + // wait for container start exec, err := dockerClient.ContainerExecCreate(ctx, createdContainer.ID, types.ExecConfig{ AttachStdout: true, AttachStderr: true, Cmd: command, }) - if err != nil { + p.logger.Error("failed to create exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } resp, err := dockerClient.ContainerExecAttach(ctx, exec.ID, types.ExecStartCheck{}) if err != nil { + p.logger.Error("failed to attach to exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -427,6 +431,7 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, execInspect, err := dockerClient.ContainerExecInspect(ctx, exec.ID) if err != nil { + p.logger.Error("failed to inspect exec", zap.Error(err), zap.String("taskName", taskName)) return "", "", 0, err } @@ -434,7 +439,35 @@ func (p *Provider) RunCommandWhileStopped(ctx context.Context, taskName string, _, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader) - return stdout.String(), stderr.String(), execInspect.ExitCode, nil + return stdout.String(), stderr.String(), execInspect.ExitCode, err +} + +func startContainerWithBlock(ctx context.Context, dockerClient *dockerclient.Client, containerID string) error { + // start container + if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil { + return err + } + + // cancel container after a minute + waitCtx, cancel := context.WithTimeout(ctx, 3*time.Minute) + defer cancel() + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-waitCtx.Done(): + return fmt.Errorf("error waiting for container to start: %v", waitCtx.Err()) + case <-ticker.C: + container, err := dockerClient.ContainerInspect(ctx, containerID) + if err != nil { + return err + } + + // if the container is running, we're done + if container.State.Running { + return nil + } + } + } } func (p *Provider) pullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error { diff --git a/core/provider/digitalocean/types.go b/core/provider/digitalocean/types.go index 8806f8b..29ad0b1 100644 --- a/core/provider/digitalocean/types.go +++ b/core/provider/digitalocean/types.go @@ -1,7 +1,7 @@ package digitalocean type DigitalOceanTaskConfig struct { - Region string - Size string + Region string + Size string ImageID int } diff --git a/core/provider/docker/task.go b/core/provider/docker/task.go index 7685536..335ac33 100644 --- a/core/provider/docker/task.go +++ b/core/provider/docker/task.go @@ -93,7 +93,7 @@ func (p *Provider) CreateTask(ctx context.Context, logger *zap.Logger, definitio listeners.CloseAll() return "", err } - + // network map is volatile, so we need to mutex update it p.networkMu.Lock() p.listeners[createdContainer.ID] = listeners @@ -144,7 +144,7 @@ func (p *Provider) StartTask(ctx context.Context, id string) error { if status == provider.TASK_RUNNING { return nil } - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 100) } } diff --git a/core/provider/task.go b/core/provider/task.go index 61103b7..797c81b 100644 --- a/core/provider/task.go +++ b/core/provider/task.go @@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi return task, nil } -// Start starts the underlying task's workload including its sidecars if startSidecars is set to true +// Start starts the underlying task's workload including its sidecars if startSidecars is set to true. +// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task, +// this is not thread-safe: PLEASE DON'T DO THAT. func (t *Task) Start(ctx context.Context, startSidecars bool) error { - t.mu.Lock() - defer t.mu.Unlock() - if startSidecars { for _, sidecar := range t.Sidecars { err := sidecar.Start(ctx, startSidecars) @@ -161,18 +160,21 @@ func (t *Task) GetExternalAddress(ctx context.Context, port string) (string, err func (t *Task) RunCommand(ctx context.Context, command []string) (string, string, int, error) { status, err := t.Provider.GetTaskStatus(ctx, t.ID) if err != nil { + t.logger.Error("failed to get task status", zap.Error(err), zap.Any("definition", t.Definition)) return "", "", 0, err } if status == TASK_RUNNING { t.mu.Lock() defer t.mu.Unlock() + t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "running")) return t.Provider.RunCommand(ctx, t.ID, command) } t.mu.Lock() defer t.mu.Unlock() + t.logger.Info("running command", zap.Strings("command", command), zap.String("status", "not running")) return t.Provider.RunCommandWhileStopped(ctx, t.ID, t.Definition, command) } diff --git a/core/types/chain.go b/core/types/chain.go index 17113fb..b30c60d 100644 --- a/core/types/chain.go +++ b/core/types/chain.go @@ -74,7 +74,7 @@ type ChainConfig struct { NodeCreator NodeCreator // NodeCreator is a function that creates a node NodeDefinitionModifier NodeDefinitionModifier // NodeDefinitionModifier is a function that modifies a node's definition - // number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set. + // number of tokens to allocate per account in the genesis state (unscaled). This value defaults to 10_000_000 if not set. // if not set. GenesisDelegation *big.Int // number of tokens to allocate to the genesis account. This value defaults to 5_000_000 if not set. diff --git a/cosmos/chain/chain.go b/cosmos/chain/chain.go index 05f984a..f225a88 100644 --- a/cosmos/chain/chain.go +++ b/cosmos/chain/chain.go @@ -72,7 +72,6 @@ func CreateChain(ctx context.Context, logger *zap.Logger, infraProvider provider Provider: infraProvider, Chain: &chain, }) - if err != nil { return err } @@ -178,8 +177,9 @@ func (c *Chain) Init(ctx context.Context) error { v := v idx := idx eg.Go(func() error { + c.logger.Info("setting up validator home dir", zap.String("validator", v.GetTask().Definition.Name)) if err := v.InitHome(ctx); err != nil { - return err + return fmt.Errorf("error initializing home dir: %v", err) } validatorWallet, err := v.CreateWallet(ctx, petritypes.ValidatorKeyName, c.Config.WalletConfig) @@ -212,6 +212,7 @@ func (c *Chain) Init(ctx context.Context) error { n := n eg.Go(func() error { + c.logger.Info("setting up node home dir", zap.String("node", n.GetTask().Definition.Name)) if err := n.InitHome(ctx); err != nil { return err } @@ -224,6 +225,7 @@ func (c *Chain) Init(ctx context.Context) error { return err } + c.logger.Info("adding faucet genesis") faucetWallet, err := c.BuildWallet(ctx, petritypes.FaucetAccountKeyName, "", c.Config.WalletConfig) if err != nil { @@ -240,23 +242,32 @@ func (c *Chain) Init(ctx context.Context) error { for i := 1; i < len(c.Validators); i++ { validatorN := c.Validators[i] - bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc") + validatorWalletAddress := c.ValidatorWallets[i].FormattedAddress() + eg.Go(func() error { + bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc") + if err != nil { + return err + } - if err != nil { - return err - } + c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32)) + if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { + return err + } - if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil { - return err - } + if err := firstValidator.AddGenesisAccount(ctx, validatorWalletAddress, genesisAmounts); err != nil { + return err + } - if err := firstValidator.AddGenesisAccount(ctx, c.ValidatorWallets[i].FormattedAddress(), genesisAmounts); err != nil { - return err - } + if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil { + return err + } - if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil { - return err - } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return err } if err := firstValidator.CollectGenTxs(ctx); err != nil { @@ -264,7 +275,6 @@ func (c *Chain) Init(ctx context.Context) error { } genbz, err := firstValidator.GenesisFileContent(ctx) - if err != nil { return err } @@ -282,44 +292,68 @@ func (c *Chain) Init(ctx context.Context) error { return err } - if err != nil { - return err + for i := range c.Validators { + v := c.Validators[i] + eg.Go(func() error { + c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name)) + if err := v.OverwriteGenesisFile(ctx, genbz); err != nil { + return err + } + if err := v.SetDefaultConfigs(ctx); err != nil { + return err + } + if err := v.SetPersistentPeers(ctx, peers); err != nil { + return err + } + return nil + }) } - for _, v := range c.Validators { - if err := v.OverwriteGenesisFile(ctx, genbz); err != nil { - return err - } - if err := v.SetDefaultConfigs(ctx); err != nil { - return err - } - if err := v.SetPersistentPeers(ctx, peers); err != nil { - return err - } + for i := range c.Nodes { + n := c.Nodes[i] + eg.Go(func() error { + c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name)) + if err := n.OverwriteGenesisFile(ctx, genbz); err != nil { + return err + } + if err := n.SetDefaultConfigs(ctx); err != nil { + return err + } + if err := n.SetPersistentPeers(ctx, peers); err != nil { + return err + } + return nil + }) } - for _, v := range c.Validators { - if err := v.GetTask().Start(ctx, true); err != nil { - return err - } + if err := eg.Wait(); err != nil { + return err } - for _, n := range c.Nodes { - if err := n.OverwriteGenesisFile(ctx, genbz); err != nil { - return err - } - if err := n.SetDefaultConfigs(ctx); err != nil { - return err - } - if err := n.SetPersistentPeers(ctx, peers); err != nil { - return err - } + for i := range c.Validators { + v := c.Validators[i] + eg.Go(func() error { + c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name)) + if err := v.GetTask().Start(ctx, true); err != nil { + return err + } + return nil + }) } - for _, n := range c.Nodes { - if err := n.GetTask().Start(ctx, true); err != nil { - return err - } + for i := range c.Nodes { + n := c.Nodes[i] + eg.Go(func() error { + c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name)) + if err := n.GetTask().Start(ctx, true); err != nil { + return err + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return err } return nil @@ -429,9 +463,10 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { c.logger.Info("waiting for height", zap.Uint64("desired_height", desiredHeight)) for { c.logger.Debug("waiting for height", zap.Uint64("desired_height", desiredHeight)) - + height, err := c.Height(ctx) if err != nil { + c.logger.Error("failed to get height", zap.Error(err)) time.Sleep(2 * time.Second) continue } @@ -439,7 +474,7 @@ func (c *Chain) WaitForHeight(ctx context.Context, desiredHeight uint64) error { if height >= desiredHeight { break } - + // We assume the chain will eventually return a non-zero height, otherwise // this may block indefinitely. if height == 0 { diff --git a/cosmos/node/genesis.go b/cosmos/node/genesis.go index 8101ace..fb79ec9 100644 --- a/cosmos/node/genesis.go +++ b/cosmos/node/genesis.go @@ -60,7 +60,7 @@ func (n *Node) AddGenesisAccount(ctx context.Context, address string, genesisAmo amount += fmt.Sprintf("%s%s", coin.Amount.String(), coin.Denom) } - ctx, cancel := context.WithTimeout(ctx, time.Minute) + ctx, cancel := context.WithTimeout(ctx, 3*time.Minute) defer cancel() var command []string