Skip to content

Commit

Permalink
Update machine concurrently when using immediate strategy (#2845)
Browse files Browse the repository at this point in the history
* Update machine concurrently when using immediate strategy

Previously, we ran the machine update api calls sequentially when using `fly deploy --strategy immedate`. This can still be slow when there are lots of machines to update, say 30 or 100 or even more.

Now, we default to running the machine updates concurrently when using `fly deploy --strategy immediate`. The default is kick off 16 machine update calls, wait for those to finish, then do another 16, wait, and so on until all machines are updated.

The new `--immediate-max-concurrent` flag may be used to set a different size. For example, `fly deploy --strategy immediate --immediate-max-concurrent 64` will run 64 machine updates concurrently, wait, and so on.

I tested this on a modest 35 machine cluster. This resulted in a pretty decent speed up. The results were:

| fly version | fly deploy flags | Time |
|---|---|---|
| flyctl v0.1.97 | --strategy immediate | 30-40 seconds |
| this pr | --strategy immediate | 8-15 seconds |
| this pr | --strategy immediate --immediate-max-concurrent 35 | 5-6 seconds |

* Use chan slice so we can continuously have up to --immediate-max-concurrent machine updates going

* oops still need the wait group so they all finish, but now we only wait once at the very end
  • Loading branch information
tvdfly authored Sep 22, 2023
1 parent cbbebd8 commit f1a45f1
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 93 deletions.
44 changes: 25 additions & 19 deletions internal/command/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ var CommonFlags = flag.Set{
Name: "label",
Description: "Add custom metadata to an image via docker labels",
},
flag.Int{
Name: "immediate-max-concurrent",
Description: "Maximum number of machines to update concurrently when using the immediate deployment strategy.",
Default: 16,
},
flag.VMSizeFlags,
}

Expand Down Expand Up @@ -306,25 +311,26 @@ func deployToMachines(
}

md, err := NewMachineDeployment(ctx, MachineDeploymentArgs{
AppCompact: appCompact,
DeploymentImage: img.Tag,
Strategy: flag.GetString(ctx, "strategy"),
EnvFromFlags: flag.GetStringArray(ctx, "env"),
PrimaryRegionFlag: appConfig.PrimaryRegion,
SkipSmokeChecks: flag.GetDetach(ctx) || !flag.GetBool(ctx, "smoke-checks"),
SkipHealthChecks: flag.GetDetach(ctx),
WaitTimeout: time.Duration(flag.GetInt(ctx, "wait-timeout")) * time.Second,
LeaseTimeout: time.Duration(flag.GetInt(ctx, "lease-timeout")) * time.Second,
MaxUnavailable: flag.GetFloat64(ctx, "max-unavailable"),
ReleaseCmdTimeout: releaseCmdTimeout,
Guest: guest,
IncreasedAvailability: flag.GetBool(ctx, "ha"),
AllocPublicIP: !flag.GetBool(ctx, "no-public-ips"),
UpdateOnly: flag.GetBool(ctx, "update-only"),
Files: files,
ExcludeRegions: excludeRegions,
NoExtensions: flag.GetBool(ctx, "no-extensions"),
OnlyRegions: onlyRegions,
AppCompact: appCompact,
DeploymentImage: img.Tag,
Strategy: flag.GetString(ctx, "strategy"),
EnvFromFlags: flag.GetStringArray(ctx, "env"),
PrimaryRegionFlag: appConfig.PrimaryRegion,
SkipSmokeChecks: flag.GetDetach(ctx) || !flag.GetBool(ctx, "smoke-checks"),
SkipHealthChecks: flag.GetDetach(ctx),
WaitTimeout: time.Duration(flag.GetInt(ctx, "wait-timeout")) * time.Second,
LeaseTimeout: time.Duration(flag.GetInt(ctx, "lease-timeout")) * time.Second,
MaxUnavailable: flag.GetFloat64(ctx, "max-unavailable"),
ReleaseCmdTimeout: releaseCmdTimeout,
Guest: guest,
IncreasedAvailability: flag.GetBool(ctx, "ha"),
AllocPublicIP: !flag.GetBool(ctx, "no-public-ips"),
UpdateOnly: flag.GetBool(ctx, "update-only"),
Files: files,
ExcludeRegions: excludeRegions,
NoExtensions: flag.GetBool(ctx, "no-extensions"),
OnlyRegions: onlyRegions,
ImmediateMaxConcurrent: flag.GetInt(ctx, "immediate-max-concurrent"),
})
if err != nil {
sentry.CaptureExceptionWithAppInfo(err, "deploy", appCompact)
Expand Down
156 changes: 82 additions & 74 deletions internal/command/deploy/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,60 +34,62 @@ type MachineDeployment interface {
}

type MachineDeploymentArgs struct {
AppCompact *api.AppCompact
DeploymentImage string
Strategy string
EnvFromFlags []string
PrimaryRegionFlag string
SkipSmokeChecks bool
SkipHealthChecks bool
MaxUnavailable float64
RestartOnly bool
WaitTimeout time.Duration
LeaseTimeout time.Duration
ReleaseCmdTimeout time.Duration
Guest *api.MachineGuest
IncreasedAvailability bool
AllocPublicIP bool
UpdateOnly bool
Files []*api.File
ProvisionExtensions bool
NoExtensions bool
ExcludeRegions map[string]interface{}
OnlyRegions map[string]interface{}
AppCompact *api.AppCompact
DeploymentImage string
Strategy string
EnvFromFlags []string
PrimaryRegionFlag string
SkipSmokeChecks bool
SkipHealthChecks bool
MaxUnavailable float64
RestartOnly bool
WaitTimeout time.Duration
LeaseTimeout time.Duration
ReleaseCmdTimeout time.Duration
Guest *api.MachineGuest
IncreasedAvailability bool
AllocPublicIP bool
UpdateOnly bool
Files []*api.File
ProvisionExtensions bool
NoExtensions bool
ExcludeRegions map[string]interface{}
OnlyRegions map[string]interface{}
ImmediateMaxConcurrent int
}

type machineDeployment struct {
apiClient *api.Client
gqlClient graphql.Client
flapsClient *flaps.Client
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *api.AppCompact
appConfig *appconfig.Config
img string
machineSet machine.MachineSet
releaseCommandMachine machine.MachineSet
volumes map[string][]api.Volume
strategy string
releaseId string
releaseVersion int
skipSmokeChecks bool
skipHealthChecks bool
maxUnavailable float64
restartOnly bool
waitTimeout time.Duration
leaseTimeout time.Duration
leaseDelayBetween time.Duration
releaseCmdTimeout time.Duration
isFirstDeploy bool
machineGuest *api.MachineGuest
increasedAvailability bool
listenAddressChecked map[string]struct{}
updateOnly bool
noExtensions bool
excludeRegions map[string]interface{}
onlyRegions map[string]interface{}
apiClient *api.Client
gqlClient graphql.Client
flapsClient *flaps.Client
io *iostreams.IOStreams
colorize *iostreams.ColorScheme
app *api.AppCompact
appConfig *appconfig.Config
img string
machineSet machine.MachineSet
releaseCommandMachine machine.MachineSet
volumes map[string][]api.Volume
strategy string
releaseId string
releaseVersion int
skipSmokeChecks bool
skipHealthChecks bool
maxUnavailable float64
restartOnly bool
waitTimeout time.Duration
leaseTimeout time.Duration
leaseDelayBetween time.Duration
releaseCmdTimeout time.Duration
isFirstDeploy bool
machineGuest *api.MachineGuest
increasedAvailability bool
listenAddressChecked map[string]struct{}
updateOnly bool
noExtensions bool
excludeRegions map[string]interface{}
onlyRegions map[string]interface{}
immediateMaxConcurrent int
}

func NewMachineDeployment(ctx context.Context, args MachineDeploymentArgs) (MachineDeployment, error) {
Expand Down Expand Up @@ -144,30 +146,36 @@ func NewMachineDeployment(ctx context.Context, args MachineDeploymentArgs) (Mach
maxUnavailable = mu
}

immedateMaxConcurrent := args.ImmediateMaxConcurrent
if immedateMaxConcurrent < 1 {
immedateMaxConcurrent = 1
}

md := &machineDeployment{
apiClient: apiClient,
gqlClient: apiClient.GenqClient,
flapsClient: flapsClient,
io: io,
colorize: io.ColorScheme(),
app: args.AppCompact,
appConfig: appConfig,
img: args.DeploymentImage,
skipSmokeChecks: args.SkipSmokeChecks,
skipHealthChecks: args.SkipHealthChecks,
restartOnly: args.RestartOnly,
maxUnavailable: maxUnavailable,
waitTimeout: waitTimeout,
leaseTimeout: leaseTimeout,
leaseDelayBetween: leaseDelayBetween,
releaseCmdTimeout: args.ReleaseCmdTimeout,
increasedAvailability: args.IncreasedAvailability,
listenAddressChecked: make(map[string]struct{}),
updateOnly: args.UpdateOnly,
machineGuest: args.Guest,
noExtensions: args.NoExtensions,
excludeRegions: args.ExcludeRegions,
onlyRegions: args.OnlyRegions,
apiClient: apiClient,
gqlClient: apiClient.GenqClient,
flapsClient: flapsClient,
io: io,
colorize: io.ColorScheme(),
app: args.AppCompact,
appConfig: appConfig,
img: args.DeploymentImage,
skipSmokeChecks: args.SkipSmokeChecks,
skipHealthChecks: args.SkipHealthChecks,
restartOnly: args.RestartOnly,
maxUnavailable: maxUnavailable,
waitTimeout: waitTimeout,
leaseTimeout: leaseTimeout,
leaseDelayBetween: leaseDelayBetween,
releaseCmdTimeout: args.ReleaseCmdTimeout,
increasedAvailability: args.IncreasedAvailability,
listenAddressChecked: make(map[string]struct{}),
updateOnly: args.UpdateOnly,
machineGuest: args.Guest,
noExtensions: args.NoExtensions,
excludeRegions: args.ExcludeRegions,
onlyRegions: args.OnlyRegions,
immediateMaxConcurrent: immedateMaxConcurrent,
}
if err := md.setStrategy(); err != nil {
return nil, err
Expand Down
23 changes: 23 additions & 0 deletions internal/command/deploy/machines_deploymachinesapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/samber/lo"
Expand Down Expand Up @@ -351,6 +352,28 @@ func (md *machineDeployment) updateExistingMachines(ctx context.Context, updateE
return md.updateUsingBlueGreenStrategy(ctx, updateEntries)
}

if md.strategy == "immediate" {
var wg sync.WaitGroup
pool := make(chan struct{}, md.immediateMaxConcurrent)
for i, updateEntry := range updateEntries {
e := updateEntry
indexStr := formatIndex(i, len(updateEntries))
wg.Add(1)
pool <- struct{}{}
go func() {
defer wg.Done()
if err := md.updateMachine(ctx, e, indexStr); err != nil {
if md.strategy == "immediate" {
fmt.Fprintf(md.io.ErrOut, "Continuing after error: %s\n", err)
}
}
<-pool
}()
}
wg.Wait()
return nil
}

var groupCount int
if mu := md.maxUnavailable; mu > 0 && mu < 1 {
groupCount = int(math.Floor(1.0 / mu))
Expand Down

0 comments on commit f1a45f1

Please sign in to comment.