Skip to content

Commit

Permalink
simplified WaitForAnyMachineState
Browse files Browse the repository at this point in the history
channels are better than that disgusting mutex garbage i was doing
  • Loading branch information
billyb2 committed Jul 26, 2024
1 parent 308ee99 commit 37f42e5
Showing 1 changed file with 21 additions and 36 deletions.
57 changes: 21 additions & 36 deletions internal/machine/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/jpillora/backoff"
Expand Down Expand Up @@ -70,6 +69,11 @@ func WaitForStartOrStop(ctx context.Context, machine *fly.Machine, action string
}
}

type waitResult struct {
state string
err error
}

// returns when the machine is in one of the possible states, or after passing the timeout threshold
func WaitForAnyMachineState(ctx context.Context, mach *fly.Machine, possibleStates []string, timeout time.Duration, sl statuslogger.StatusLine) (string, error) {
ctx, span := tracing.GetTracer().Start(ctx, "wait_for_machine_state", trace.WithAttributes(
Expand All @@ -82,56 +86,37 @@ func WaitForAnyMachineState(ctx context.Context, mach *fly.Machine, possibleStat

flapsClient := flapsutil.ClientFromContext(ctx)

var mutex sync.Mutex

var waitErr error
numCompleted := 0
var successfulState string
channel := make(chan waitResult, len(possibleStates))

for _, state := range possibleStates {
state := state
go func() {
err := flapsClient.Wait(ctx, mach, state, timeout)
mutex.Lock()
defer func() {
numCompleted += 1
mutex.Unlock()
}()

if successfulState != "" {
return
}

if sl != nil {
if sl != nil && err == nil {
sl.LogStatus(statuslogger.StatusRunning, fmt.Sprintf("Machine %s reached %s state", mach.ID, state))
}

if err != nil {
waitErr = err
} else {
successfulState = state
channel <- waitResult{
state: state,
err: err,
}
}()
}

// TODO(billy): i'm sure we can use channels here
numCompleted := 0
for {
mutex.Lock()
if successfulState != "" || numCompleted == len(possibleStates) {
defer mutex.Unlock()
if successfulState != "" {
span.SetAttributes(attribute.String("state", successfulState))
select {
case result := <-channel:
numCompleted += 1
if result.err == nil || numCompleted == len(possibleStates) {
return result.state, nil
}

if waitErr != nil {
span.RecordError(waitErr)
case <-ctx.Done():
return "", &WaitTimeoutErr{
machineID: mach.ID,
timeout: timeout,
desiredState: strings.Join(possibleStates, ", "),
}

return successfulState, waitErr
}
mutex.Unlock()

time.Sleep(1 * time.Second)
}
}

Expand Down

0 comments on commit 37f42e5

Please sign in to comment.