Skip to content

Commit

Permalink
Make sure to wait before restarting the remote builder
Browse files Browse the repository at this point in the history
If the remote builder is in the 'created' or 'migrating' state, then we
won't be able to restart it. By making sure to wait for one of those two
states, we can avoid errors like:

`failed to fetch an image or build from source: failed to restart VM asfijsdoif: failed_precondition: unable to restart machine, not currently started or stopped`
  • Loading branch information
billyb2 committed Jul 26, 2024
1 parent 142eea2 commit 751e979
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 61 deletions.
3 changes: 3 additions & 0 deletions internal/build/imgsrc/ensure_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/flyutil"
"github.com/superfly/flyctl/internal/haikunator"
mach "github.com/superfly/flyctl/internal/machine"
"github.com/superfly/flyctl/internal/tracing"
)

Expand Down Expand Up @@ -409,6 +410,8 @@ func restartBuilderMachine(ctx context.Context, builderMachine *fly.Machine) err

flapsClient := flapsutil.ClientFromContext(ctx)

mach.WaitForAnyMachineState(ctx, builderMachine, []string{"started", "stopped"}, 60*time.Second, nil)

if err := flapsClient.Restart(ctx, fly.RestartMachineInput{
ID: builderMachine.ID,
}, ""); err != nil {
Expand Down
9 changes: 9 additions & 0 deletions internal/build/imgsrc/ensure_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ func TestRestartBuilderMachine(t *testing.T) {
ctx := context.Background()

couldNotReserveResources := false
waitedForStartOrStop := true
flapsClient := mock.FlapsClient{
RestartFunc: func(ctx context.Context, input fly.RestartMachineInput, nonce string) error {
if couldNotReserveResources {
Expand All @@ -275,16 +276,24 @@ func TestRestartBuilderMachine(t *testing.T) {
return nil
},
WaitFunc: func(ctx context.Context, machine *fly.Machine, state string, timeout time.Duration) (err error) {
if state == "started" || state == "stopped" {
waitedForStartOrStop = true
}

return nil
},
}

ctx = flapsutil.NewContextWithClient(ctx, &flapsClient)
err := restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"})
assert.NoError(t, err)
assert.True(t, waitedForStartOrStop)

waitedForStartOrStop = false
couldNotReserveResources = true
err = restartBuilderMachine(ctx, &fly.Machine{ID: "bigmachine"})
assert.True(t, waitedForStartOrStop)
assert.Error(t, err)
assert.ErrorIs(t, err, ShouldReplaceBuilderMachine)

}
62 changes: 1 addition & 61 deletions internal/command/deploy/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (md *machineDeployment) updateMachineWChecks(ctx context.Context, oldMachin

if !healthcheckResult.machineChecksPassed || !healthcheckResult.smokeChecksPassed {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Waiting for machine %s to reach a good state", oldMachine.ID))
_, err := waitForMachineState(ctx, lm, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
_, err := mach.WaitForAnyMachineState(ctx, machine, []string{"stopped", "started", "suspended"}, md.waitTimeout, sl)
if err != nil {
span.RecordError(err)
return err
Expand Down Expand Up @@ -585,66 +585,6 @@ func (md *machineDeployment) clearMachineLease(ctx context.Context, machID, leas
}
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func waitForMachineState(ctx context.Context, lm mach.LeasableMachine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
attribute.StringSlice("possible_states", possibleStates),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var mutex sync.Mutex

var waitErr error
numCompleted := 0
var successfulState string

for _, state := range possibleStates {
state := state
go func() {
err := lm.WaitForState(ctx, state, timeout, false)
mutex.Lock()
defer func() {
numCompleted += 1
mutex.Unlock()
}()

if successfulState != "" {
return
}
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", lm.Machine().ID, state))

if err != nil {
waitErr = err
} else {
successfulState = state
}
}()
}

// TODO(billy): i'm sure we can use channels here
for {
mutex.Lock()
if successfulState != "" || numCompleted == len(possibleStates) {
defer mutex.Unlock()
if successfulState != "" {
span.SetAttributes(attribute.String("state", successfulState))
}

if waitErr != nil {
span.RecordError(waitErr)
}

return successfulState, waitErr
}
mutex.Unlock()

time.Sleep(1 * time.Second)
}
}

func (md *machineDeployment) acquireMachineLease(ctx context.Context, machID string) (*fly.MachineLease, error) {
leaseTimeout := int(md.leaseTimeout)
lease, err := md.flapsClient.AcquireLease(ctx, machID, &leaseTimeout)
Expand Down
70 changes: 70 additions & 0 deletions internal/machine/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/jpillora/backoff"
fly "github.com/superfly/fly-go"
"github.com/superfly/fly-go/flaps"
"github.com/superfly/flyctl/internal/flapsutil"
"github.com/superfly/flyctl/internal/flyerr"
"github.com/superfly/flyctl/internal/statuslogger"
"github.com/superfly/flyctl/internal/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string, timeout time.Duration) error {
Expand Down Expand Up @@ -65,6 +70,71 @@ func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string
}
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func WaitForAnyMachineState(ctx context.Context, mach *fly.Machine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
attribute.StringSlice("possible_states", possibleStates),
))
defer span.End()

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

flapsClient := flapsutil.ClientFromContext(ctx)

var mutex sync.Mutex

var waitErr error
numCompleted := 0
var successfulState string

for _, state := range possibleStates {
state := state
go func() {
err := flapsClient.Wait(ctx, mach, state, timeout)
mutex.Lock()
defer func() {
numCompleted += 1
mutex.Unlock()
}()

if successfulState != "" {
return
}

if sl != nil {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", mach.ID, state))
}

if err != nil {
waitErr = err
} else {
successfulState = state
}
}()
}

// TODO(billy): i'm sure we can use channels here
for {
mutex.Lock()
if successfulState != "" || numCompleted == len(possibleStates) {
defer mutex.Unlock()
if successfulState != "" {
span.SetAttributes(attribute.String("state", successfulState))
}

if waitErr != nil {
span.RecordError(waitErr)
}

return successfulState, waitErr
}
mutex.Unlock()

time.Sleep(1 * time.Second)
}
}

type WaitTimeoutErr struct {
machineID string
timeout time.Duration
Expand Down

0 comments on commit 751e979

Please sign in to comment.