diff --git a/internal/build/imgsrc/ensure_builder.go b/internal/build/imgsrc/ensure_builder.go index 1d85dfde32..d993a78b39 100644 --- a/internal/build/imgsrc/ensure_builder.go +++ b/internal/build/imgsrc/ensure_builder.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net/http" "strings" "time" @@ -64,7 +65,7 @@ func EnsureBuilder(ctx context.Context, org *fly.Organization, region string, re } if validateBuilderErr == BuilderMachineNotStarted { - err := restartBuilderMachine(ctx, builderMachine) + err := startBuilder(ctx, builderMachine) switch { case errors.Is(err, ShouldReplaceBuilderMachine): span.AddEvent("recreating builder due to resource reservation error") @@ -405,29 +406,41 @@ func createBuilder(ctx context.Context, org *fly.Organization, region, builderNa return } -func restartBuilderMachine(ctx context.Context, builderMachine *fly.Machine) error { +func startBuilder(ctx context.Context, builderMachine *fly.Machine) error { ctx, span := tracing.GetTracer().Start(ctx, "restart_builder_machine") defer span.End() flapsClient := flapsutil.ClientFromContext(ctx) - if err := flapsClient.Restart(ctx, fly.RestartMachineInput{ - ID: builderMachine.ID, - }, ""); err != nil { - if strings.Contains(err.Error(), "could not reserve resource for machine") || - strings.Contains(err.Error(), "deploys to this host are temporarily disabled") { + var retries int + + for { + var flapsErr *flaps.FlapsError + _, err := flapsClient.Start(ctx, builderMachine.ID, "") + switch { + case err == nil: + return nil + case retries >= 5: + span.RecordError(err) + return err + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + span.RecordError(err) + return err + case strings.Contains(err.Error(), "could not reserve resource for machine"), + strings.Contains(err.Error(), "deploys to this host are temporarily disabled"): span.RecordError(err) return ShouldReplaceBuilderMachine + case !errors.As(err, &flapsErr): + span.RecordError(err) + case flapsErr.ResponseStatusCode == http.StatusPreconditionFailed, + flapsErr.ResponseStatusCode >= 500: + span.AddEvent(fmt.Sprintf("non-server error %v", flapsErr.Error())) + default: + // we only retry server 500s + span.RecordError(err) + return err } - - tracing.RecordError(span, err, "error restarting builder machine") - return err + retries++ + time.Sleep(1 * time.Second) } - - if err := flapsClient.Wait(ctx, builderMachine, "started", time.Second*60); err != nil { - tracing.RecordError(span, err, "error waiting for builder machine to start") - return err - } - - return nil } diff --git a/internal/build/imgsrc/ensure_builder_test.go b/internal/build/imgsrc/ensure_builder_test.go index b829086b25..c07e96d943 100644 --- a/internal/build/imgsrc/ensure_builder_test.go +++ b/internal/build/imgsrc/ensure_builder_test.go @@ -266,25 +266,23 @@ func TestRestartBuilderMachine(t *testing.T) { couldNotReserveResources := false flapsClient := mock.FlapsClient{ - RestartFunc: func(ctx context.Context, input fly.RestartMachineInput, nonce string) error { + StartFunc: func(ctx context.Context, machineID string, nonce string) (*fly.MachineStartResponse, error) { if couldNotReserveResources { - return &flaps.FlapsError{ + return nil, &flaps.FlapsError{ OriginalError: fmt.Errorf("failed to restart VM xyzabc: unknown: could not reserve resource for machine: insufficient memory available to fulfill request"), } } - return nil - }, - WaitFunc: func(ctx context.Context, machine *fly.Machine, state string, timeout time.Duration) (err error) { - return nil + return nil, nil }, } ctx = flapsutil.NewContextWithClient(ctx, &flapsClient) - err := restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"}) + err := startBuilder(ctx, &fly.Machine{ID: "bigmachine"}) assert.NoError(t, err) couldNotReserveResources = true - err = restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"}) + err = startBuilder(ctx, &fly.Machine{ID: "bigmachine"}) assert.Error(t, err) assert.ErrorIs(t, err, ShouldReplaceBuilderMachine) + } diff --git a/internal/command/deploy/plan.go b/internal/command/deploy/plan.go index aa6eb75f5a..69e655bddb 100644 --- a/internal/command/deploy/plan.go +++ b/internal/command/deploy/plan.go @@ -460,7 +460,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed { sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", oldMachine.ID)) - _, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl) + _, err := mach.WaitForAnyMachineState(ctx, machine, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl) if err != nil { span.RecordError(err) return err @@ -473,8 +473,9 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Running smoke checks on machine %s", machine.ID)) err = md.doSmokeChecks(ctx, lm, false) if err != nil { + err := &unrecoverableError{err: err} span.RecordError(err) - return &unrecoverableError{err: err} + return err } healthcheckResult.smokeChecksPassed = true } @@ -493,11 +494,16 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin if !healthcheckResult.regularChecksPassed { sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Checking health of machine %s", machine.ID)) err = lm.WaitForHealthchecksToPass(ctx, md.waitTimeout) - if err != nil { + switch { + case errors.Is(err, context.DeadlineExceeded): + span.RecordError(err) + return err + case err != nil: err := &unrecoverableError{err: err} span.RecordError(err) return err } + healthcheckResult.regularChecksPassed = true } @@ -585,66 +591,6 @@ func (md *machineDeployment) clearMachineLease(ctx context.Context, machID, leas } } -// returns when the machine is in one of the possible states, or after passing the timeout threshold -func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) { - ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes( - attribute.StringSlice("possible_states", possibleStates), - )) - defer span.End() - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - var mutex sync.Mutex - - var waitErr error - numCompleted := 0 - var successfulState string - - for _, state := range possibleStates { - state := state - go func() { - err := lm.WaitForState(ctx, state, timeout, false) - mutex.Lock() - defer func() { - numCompleted += 1 - mutex.Unlock() - }() - - if successfulState != "" { - return - } - sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", lm.Machine().ID, state)) - - if err != nil { - waitErr = err - } else { - successfulState = state - } - }() - } - - // TODO(billy): i'm sure we can use channels here - for { - mutex.Lock() - if successfulState != "" || numCompleted == len(possibleStates) { - defer mutex.Unlock() - if successfulState != "" { - span.SetAttributes(attribute.String("state", successfulState)) - } - - if waitErr != nil { - span.RecordError(waitErr) - } - - return successfulState, waitErr - } - mutex.Unlock() - - time.Sleep(1 * time.Second) - } -} - func (md *machineDeployment) acquireMachineLease(ctx context.Context, machID string) (*fly.MachineLease, error) { leaseTimeout := int(md.leaseTimeout) lease, err := md.flapsClient.AcquireLease(ctx, machID, &leaseTimeout) diff --git a/internal/command/deploy/plan_test.go b/internal/command/deploy/plan_test.go index 1341d119a9..f3a597d4a6 100644 --- a/internal/command/deploy/plan_test.go +++ b/internal/command/deploy/plan_test.go @@ -239,7 +239,7 @@ func TestUpdateMachines(t *testing.T) { machine.State = "started" return nil } else { - return assert.AnError + return nil } }, ListFunc: func(ctx context.Context, state string) ([]*fly.Machine, error) { diff --git a/internal/machine/wait.go b/internal/machine/wait.go index 07416c3b33..280cc58a20 100644 --- a/internal/machine/wait.go +++ b/internal/machine/wait.go @@ -13,6 +13,10 @@ import ( "github.com/superfly/fly-go/flaps" "github.com/superfly/flyctl/internal/flapsutil" "github.com/superfly/flyctl/internal/flyerr" + "github.com/superfly/flyctl/internal/statuslogger" + "github.com/superfly/flyctl/internal/tracing" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string, timeout time.Duration) error { @@ -65,6 +69,72 @@ func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string } } +type waitResult struct { + state string + err error +} + +// returns when the machine is in one of the possible states, or after passing the timeout threshold +func WaitForAnyMachineState(ctx context.Context, mach *fly.Machine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) { + ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes( + attribute.StringSlice("possible_states", possibleStates), + )) + defer span.End() + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + flapsClient := flapsutil.ClientFromContext(ctx) + + channel := make(chan waitResult, len(possibleStates)) + + for _, state := range possibleStates { + state := state + go func() { + err := flapsClient.Wait(ctx, mach, state, timeout) + if sl != nil && err == nil { + sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", mach.ID, state)) + } + channel <- waitResult{ + state: state, + err: err, + } + }() + } + + numCompleted := 0 + for { + select { + case result := <-channel: + span.AddEvent("machine_state_change", trace.WithAttributes( + attribute.String("state", result.state), + attribute.String("machine_id", mach.ID), + attribute.String("err", fmt.Sprintf("%v", result.err)), + )) + numCompleted += 1 + if result.err == nil { + return result.state, nil + } + if numCompleted == len(possibleStates) { + err := &WaitTimeoutErr{ + machineID: mach.ID, + timeout: timeout, + desiredState: strings.Join(possibleStates, ", "), + } + return "", err + } + case <-ctx.Done(): + err := &WaitTimeoutErr{ + machineID: mach.ID, + timeout: timeout, + desiredState: strings.Join(possibleStates, ", "), + } + span.RecordError(err) + return "", err + } + } +} + type WaitTimeoutErr struct { machineID string timeout time.Duration