Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure to wait before restarting the remote builder #3785

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
47 changes: 30 additions & 17 deletions internal/build/imgsrc/ensure_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strings"
"time"

Expand Down Expand Up @@ -64,7 +65,7 @@ func EnsureBuilder(ctx context.Context, org *fly.Organization, region string, re
}

if validateBuilderErr == BuilderMachineNotStarted {
err := restartBuilderMachine(ctx, builderMachine)
err := startBuilder(ctx, builderMachine)
switch {
case errors.Is(err, ShouldReplaceBuilderMachine):
span.AddEvent("recreating builder due to resource reservation error")
Expand Down Expand Up @@ -405,29 +406,41 @@ func createBuilder(ctx context.Context, org *fly.Organization, region, builderNa
return
}

func restartBuilderMachine(ctx context.Context, builderMachine *fly.Machine) error {
func startBuilder(ctx context.Context, builderMachine *fly.Machine) error {
ctx, span := tracing.GetTracer().Start(ctx, "restart_builder_machine")
defer span.End()

flapsClient := flapsutil.ClientFromContext(ctx)

if err := flapsClient.Restart(ctx, fly.RestartMachineInput{
ID: builderMachine.ID,
}, ""); err != nil {
if strings.Contains(err.Error(), "could not reserve resource for machine") ||
strings.Contains(err.Error(), "deploys to this host are temporarily disabled") {
var retries int

for {
var flapsErr *flaps.FlapsError
_, err := flapsClient.Start(ctx, builderMachine.ID, "")
switch {
case err == nil:
return nil
case retries >= 5:
span.RecordError(err)
return err
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
span.RecordError(err)
return err
case strings.Contains(err.Error(), "could not reserve resource for machine"),
strings.Contains(err.Error(), "deploys to this host are temporarily disabled"):
span.RecordError(err)
return ShouldReplaceBuilderMachine
case !errors.As(err, &flapsErr):
span.RecordError(err)
case flapsErr.ResponseStatusCode == http.StatusPreconditionFailed,
flapsErr.ResponseStatusCode >= 500:
span.AddEvent(fmt.Sprintf("non-server error %v", flapsErr.Error()))
default:
// we only retry server 500s
span.RecordError(err)
return err
}

tracing.RecordError(span, err, "error restarting builder machine")
return err
retries++
time.Sleep(1 * time.Second)
}

if err := flapsClient.Wait(ctx, builderMachine, "started", time.Second*60); err != nil {
tracing.RecordError(span, err, "error waiting for builder machine to start")
return err
}

return nil
}
14 changes: 6 additions & 8 deletions internal/build/imgsrc/ensure_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,25 +266,23 @@ func TestRestartBuilderMachine(t *testing.T) {

couldNotReserveResources := false
flapsClient := mock.FlapsClient{
RestartFunc: func(ctx context.Context, input fly.RestartMachineInput, nonce string) error {
StartFunc: func(ctx context.Context, machineID string, nonce string) (*fly.MachineStartResponse, error) {
if couldNotReserveResources {
return &flaps.FlapsError{
return nil, &flaps.FlapsError{
OriginalError: fmt.Errorf("failed to restart VM xyzabc: unknown: could not reserve resource for machine: insufficient memory available to fulfill request"),
}
}
return nil
},
WaitFunc: func(ctx context.Context, machine *fly.Machine, state string, timeout time.Duration) (err error) {
return nil
return nil, nil
},
}

ctx = flapsutil.NewContextWithClient(ctx, &flapsClient)
err := restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"})
err := startBuilder(ctx, &fly.Machine{ID: "bigmachine"})
assert.NoError(t, err)

couldNotReserveResources = true
err = restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"})
err = startBuilder(ctx, &fly.Machine{ID: "bigmachine"})
assert.Error(t, err)
assert.ErrorIs(t, err, ShouldReplaceBuilderMachine)

}
72 changes: 9 additions & 63 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin

if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", oldMachine.ID))
_, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
_, err := mach.WaitForAnyMachineState(ctx, machine, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
if err != nil {
span.RecordError(err)
return err
Expand All @@ -473,8 +473,9 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Running smoke checks on machine %s", machine.ID))
err = md.doSmokeChecks(ctx, lm, false)
if err != nil {
err := &unrecoverableError{err: err}
span.RecordError(err)
return &unrecoverableError{err: err}
return err
}
healthcheckResult.smokeChecksPassed = true
}
Expand All @@ -493,11 +494,16 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin
if !healthcheckResult.regularChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Checking health of machine %s", machine.ID))
err = lm.WaitForHealthchecksToPass(ctx, md.waitTimeout)
if err != nil {
switch {
case errors.Is(err, context.DeadlineExceeded):
billyb2 marked this conversation as resolved.
Show resolved Hide resolved
span.RecordError(err)
return err
case err != nil:
err := &unrecoverableError{err: err}
span.RecordError(err)
return err
}

healthcheckResult.regularChecksPassed = true
}

Expand Down Expand Up @@ -585,66 +591,6 @@ func (md *machineDeployment) clearMachineLease(ctx context.Context, machID, leas
}
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
attribute.StringSlice("possible_states", possibleStates),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var mutex sync.Mutex

var waitErr error
numCompleted := 0
var successfulState string

for _, state := range possibleStates {
state := state
go func() {
err := lm.WaitForState(ctx, state, timeout, false)
mutex.Lock()
defer func() {
numCompleted += 1
mutex.Unlock()
}()

if successfulState != "" {
return
}
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", lm.Machine().ID, state))

if err != nil {
waitErr = err
} else {
successfulState = state
}
}()
}

// TODO(billy): i'm sure we can use channels here
for {
mutex.Lock()
if successfulState != "" || numCompleted == len(possibleStates) {
defer mutex.Unlock()
if successfulState != "" {
span.SetAttributes(attribute.String("state", successfulState))
}

if waitErr != nil {
span.RecordError(waitErr)
}

return successfulState, waitErr
}
mutex.Unlock()

time.Sleep(1 * time.Second)
}
}

func (md *machineDeployment) acquireMachineLease(ctx context.Context, machID string) (*fly.MachineLease, error) {
leaseTimeout := int(md.leaseTimeout)
lease, err := md.flapsClient.AcquireLease(ctx, machID, &leaseTimeout)
Expand Down
2 changes: 1 addition & 1 deletion internal/command/deploy/plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func TestUpdateMachines(t *testing.T) {
machine.State = "started"
return nil
} else {
return assert.AnError
return nil
}
},
ListFunc: func(ctx context.Context, state string) ([]*fly.Machine, error) {
Expand Down
70 changes: 70 additions & 0 deletions internal/machine/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/flyerr"
"github.com/superfly/flyctl/internal/statuslogger"
"github.com/superfly/flyctl/internal/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string, timeout time.Duration) error {
Expand Down Expand Up @@ -65,6 +69,72 @@ func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string
}
}

type waitResult struct {
state string
err error
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func WaitForAnyMachineState(ctx context.Context, mach *fly.Machine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
attribute.StringSlice("possible_states", possibleStates),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

flapsClient := flapsutil.ClientFromContext(ctx)

channel := make(chan waitResult, len(possibleStates))

for _, state := range possibleStates {
state := state
go func() {
err := flapsClient.Wait(ctx, mach, state, timeout)
if sl != nil && err == nil {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", mach.ID, state))
}
channel <- waitResult{
state: state,
err: err,
}
}()
}

numCompleted := 0
for {
select {
case result := <-channel:
span.AddEvent("machine_state_change", trace.WithAttributes(
attribute.String("state", result.state),
attribute.String("machine_id", mach.ID),
attribute.String("err", fmt.Sprintf("%v", result.err)),
))
numCompleted += 1
if result.err == nil {
return result.state, nil
}
if numCompleted == len(possibleStates) {
err := &WaitTimeoutErr{
machineID: mach.ID,
timeout: timeout,
desiredState: strings.Join(possibleStates, ", "),
}
return "", err
}
case <-ctx.Done():
err := &WaitTimeoutErr{
machineID: mach.ID,
timeout: timeout,
desiredState: strings.Join(possibleStates, ", "),
}
span.RecordError(err)
return "", err
}
}
}

type WaitTimeoutErr struct {
machineID string
timeout time.Duration
Expand Down
Loading