Skip to content

Commit

Permalink
Merge pull request #118 from skip-mev/nv/concurrent-network-init
Browse files Browse the repository at this point in the history
feat: concurrent chain initialization
  • Loading branch information
Eric-Warehime authored Nov 5, 2024
2 parents 4cc2f0c + 6ed6d8d commit b93f7f9
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 55 deletions.
3 changes: 1 addition & 2 deletions core/provider/digitalocean/digitalocean_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,10 @@ func NewDigitalOceanProvider(ctx context.Context, logger *zap.Logger, providerNa
firewall, err := digitalOceanProvider.createFirewall(ctx, userIPs)

if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create firewall: %w", err)
}

digitalOceanProvider.firewallID = firewall.ID

_, err = digitalOceanProvider.createSSHKey(ctx, sshPubKey)

if err != nil {
Expand Down
7 changes: 3 additions & 4 deletions core/provider/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ func CreateTask(ctx context.Context, logger *zap.Logger, provider Provider, defi
return task, nil
}

// Start starts the underlying task's workload including its sidecars if startSidecars is set to true
// Start starts the underlying task's workload including its sidecars if startSidecars is set to true.
// This method does not take a lock on the provider, hence 2 threads may simultaneously call Start on the same task,
// this is not thread-safe: PLEASE DON'T DO THAT.
func (t *Task) Start(ctx context.Context, startSidecars bool) error {
t.mu.Lock()
defer t.mu.Unlock()

if startSidecars {
for _, sidecar := range t.Sidecars {
err := sidecar.Start(ctx, startSidecars)
Expand Down
125 changes: 76 additions & 49 deletions cosmos/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,32 +242,39 @@ func (c *Chain) Init(ctx context.Context) error {

for i := 1; i < len(c.Validators); i++ {
validatorN := c.Validators[i]
bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc")
validatorWalletAddress := c.ValidatorWallets[i].FormattedAddress()
eg.Go(func() error {
bech32, err := validatorN.KeyBech32(ctx, petritypes.ValidatorKeyName, "acc")
if err != nil {
return err
}

if err != nil {
return err
}
c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32))
if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil {
return err
}

c.logger.Info("setting up validator keys", zap.String("validator", validatorN.GetTask().Definition.Name), zap.String("address", bech32))
if err := firstValidator.AddGenesisAccount(ctx, bech32, genesisAmounts); err != nil {
return err
}
if err := firstValidator.AddGenesisAccount(ctx, validatorWalletAddress, genesisAmounts); err != nil {
return err
}

if err := firstValidator.AddGenesisAccount(ctx, c.ValidatorWallets[i].FormattedAddress(), genesisAmounts); err != nil {
return err
}
if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil {
return err
}

if err := validatorN.CopyGenTx(ctx, firstValidator); err != nil {
return err
}
return nil
})
}

if err := eg.Wait(); err != nil {
return err
}

if err := firstValidator.CollectGenTxs(ctx); err != nil {
return err
}

genbz, err := firstValidator.GenesisFileContent(ctx)

if err != nil {
return err
}
Expand All @@ -285,48 +292,68 @@ func (c *Chain) Init(ctx context.Context) error {
return err
}

if err != nil {
return err
for i := range c.Validators {
v := c.Validators[i]
eg.Go(func() error {
c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name))
if err := v.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
if err := v.SetDefaultConfigs(ctx); err != nil {
return err
}
if err := v.SetPersistentPeers(ctx, peers); err != nil {
return err
}
return nil
})
}

for _, v := range c.Validators {
c.logger.Info("overwriting genesis for validator", zap.String("validator", v.GetTask().Definition.Name))
if err := v.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
if err := v.SetDefaultConfigs(ctx); err != nil {
return err
}
if err := v.SetPersistentPeers(ctx, peers); err != nil {
return err
}
for i := range c.Nodes {
n := c.Nodes[i]
eg.Go(func() error {
c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name))
if err := n.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
if err := n.SetDefaultConfigs(ctx); err != nil {
return err
}
if err := n.SetPersistentPeers(ctx, peers); err != nil {
return err
}
return nil
})
}

for _, v := range c.Validators {
c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name))
if err := v.GetTask().Start(ctx, true); err != nil {
return err
}
if err := eg.Wait(); err != nil {
return err
}

for _, n := range c.Nodes {
c.logger.Info("overwriting node genesis", zap.String("node", n.GetTask().Definition.Name))
if err := n.OverwriteGenesisFile(ctx, genbz); err != nil {
return err
}
if err := n.SetDefaultConfigs(ctx); err != nil {
return err
}
if err := n.SetPersistentPeers(ctx, peers); err != nil {
return err
}
for i := range c.Validators {
v := c.Validators[i]
eg.Go(func() error {
c.logger.Info("starting validator task", zap.String("validator", v.GetTask().Definition.Name))
if err := v.GetTask().Start(ctx, true); err != nil {
return err
}
return nil
})
}

for _, n := range c.Nodes {
c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name))
if err := n.GetTask().Start(ctx, true); err != nil {
return err
}
for i := range c.Nodes {
n := c.Nodes[i]
eg.Go(func() error {
c.logger.Info("starting node task", zap.String("node", n.GetTask().Definition.Name))
if err := n.GetTask().Start(ctx, true); err != nil {
return err
}
return nil
})
}

if err := eg.Wait(); err != nil {
return err
}

return nil
Expand Down

0 comments on commit b93f7f9

Please sign in to comment.