Skip to content

Commit

Permalink
Merge pull request #265 from form3tech-oss/nvloff-stuck-iterations
Browse files Browse the repository at this point in the history
fix: exit after a timeout on stuck iterations
  • Loading branch information
nvloff-f3 authored Jul 12, 2024
2 parents 105ce68 + b43bfb9 commit cd26e01
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 83 deletions.
4 changes: 3 additions & 1 deletion internal/run/run_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/form3tech-oss/f1/v2/pkg/f1/scenarios"
)

const waitForCompletionTimeout = 10 * time.Second

func Cmd(
s *scenarios.Scenarios,
builders []api.Builder,
Expand Down Expand Up @@ -157,7 +159,7 @@ func runCmdExecute(
MaxFailures: maxFailures,
MaxFailuresRate: maxFailuresRate,
IgnoreDropped: ignoreDropped,
}, s, trig, settings, metricsInstance, output)
}, s, trig, waitForCompletionTimeout, settings, metricsInstance, output)
if err != nil {
return fmt.Errorf("new run: %w", err)
}
Expand Down
63 changes: 62 additions & 1 deletion internal/run/run_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,68 @@ func TestInterruptedRun(t *testing.T) {
then.
setup_teardown_is_called_within(600 * time.Millisecond).and().
metrics_are_pushed_to_prometheus().and().
there_is_a_metric_called("form3_loadtest_iteration")
there_is_a_metric_called("form3_loadtest_iteration").and().
expect_the_stdout_output_to_include([]string{
"Interrupted - waiting for active tests to complete",
})
}

func TestInterruptedRun_TimesOut(t *testing.T) {
t.Parallel()

given, when, then := NewRunTestStage(t)

given.
a_timer_is_started().
a_rate_of("1/1s").and().
a_duration_of(5 * time.Second).and().
a_scenario_where_each_iteration_takes(3 * time.Second).and().
wait_for_completion_timeout_of(1 * time.Second).and().
a_distribution_type("none")

when.
the_run_command_is_executed_and_cancelled_after(500 * time.Millisecond)

then.
setup_teardown_is_called_within(600*time.Millisecond + 1*time.Second).and().
metrics_are_pushed_to_prometheus().and().
expect_the_stdout_output_to_include([]string{
"Interrupted - waiting for active tests to complete",
"Active tests not completed after 1s. Stopping...",
})

// needed for goroutine checker
// sleep is the time left in the interation duration after the timeout
time.Sleep(3 * time.Second)
}

func TestMaxDurationReached_TimesOut(t *testing.T) {
t.Parallel()

given, when, then := NewRunTestStage(t)

given.
a_timer_is_started().
a_rate_of("1/1s").and().
a_duration_of(500 * time.Millisecond).and().
a_scenario_where_each_iteration_takes(3 * time.Second).and().
wait_for_completion_timeout_of(1 * time.Second).and().
a_distribution_type("none")

when.
the_run_command_is_executed()

then.
setup_teardown_is_called_within(600*time.Millisecond + 1*time.Second).and().
metrics_are_pushed_to_prometheus().and().
expect_the_stdout_output_to_include([]string{
"Max Duration Elapsed - waiting for active tests to complete",
"Active tests not completed after 1s. Stopping...",
})

// needed for goroutine checker
// sleep is the time left in the interation duration after the timeout
time.Sleep(3 * time.Second)
}

func TestFinalRunMetrics(t *testing.T) {
Expand Down
110 changes: 60 additions & 50 deletions internal/run/run_stage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,58 +68,60 @@ type (
)

type RunTestStage struct {
startTime time.Time
metrics *metrics.Metrics
output *ui.Output
runInstance *run.Run
runResult *run.Result
t *testing.T
require *require.Assertions
metricData *MetricData
scenarioCleanup func()
assert *assert.Assertions
iterationCleanup func()
f1 *f1.F1
durations sync.Map
frequency string
rate string
stages string
distributionType string
configFile string
startRate string
endRate string
rampDuration string
scenario string
settings envsettings.Settings
maxFailures uint64
maxIterations uint64
maxFailuresRate int
duration time.Duration
concurrency int
triggerType TriggerType
iterationTeardownCount atomic.Uint32
setupTeardownCount atomic.Uint32
runCount atomic.Uint32
stdout syncWriter
stderr syncWriter
interactive bool
verbose bool
startTime time.Time
metrics *metrics.Metrics
output *ui.Output
runInstance *run.Run
runResult *run.Result
t *testing.T
require *require.Assertions
metricData *MetricData
scenarioCleanup func()
assert *assert.Assertions
iterationCleanup func()
f1 *f1.F1
durations sync.Map
frequency string
rate string
stages string
distributionType string
configFile string
startRate string
endRate string
rampDuration string
scenario string
settings envsettings.Settings
maxFailures uint64
maxIterations uint64
maxFailuresRate int
duration time.Duration
waitForCompletionTimeout time.Duration
concurrency int
triggerType TriggerType
iterationTeardownCount atomic.Uint32
setupTeardownCount atomic.Uint32
runCount atomic.Uint32
stdout syncWriter
stderr syncWriter
interactive bool
verbose bool
}

func NewRunTestStage(t *testing.T) (*RunTestStage, *RunTestStage, *RunTestStage) {
t.Helper()
stage := &RunTestStage{
t: t,
concurrency: 100,
assert: assert.New(t),
require: require.New(t),
f1: f1.New(),
settings: envsettings.Get(),
metricData: NewMetricData(),
output: ui.NewDiscardOutput(),
metrics: metrics.NewInstance(prometheus.NewRegistry(), true),
stdout: syncWriter{writer: &bytes.Buffer{}},
stderr: syncWriter{writer: &bytes.Buffer{}},
t: t,
concurrency: 100,
assert: assert.New(t),
require: require.New(t),
f1: f1.New(),
settings: envsettings.Get(),
metricData: NewMetricData(),
output: ui.NewDiscardOutput(),
metrics: metrics.NewInstance(prometheus.NewRegistry(), true),
stdout: syncWriter{writer: &bytes.Buffer{}},
stderr: syncWriter{writer: &bytes.Buffer{}},
waitForCompletionTimeout: 5 * time.Second,
}

handler := FakePrometheusHandler(t, stage.metricData)
Expand All @@ -141,6 +143,11 @@ func (s *RunTestStage) a_rate_of(rate string) *RunTestStage {
return s
}

func (s *RunTestStage) wait_for_completion_timeout_of(timeout time.Duration) *RunTestStage {
s.waitForCompletionTimeout = timeout
return s
}

func (s *RunTestStage) and() *RunTestStage {
return s
}
Expand Down Expand Up @@ -198,7 +205,7 @@ func (s *RunTestStage) setupRun() {
MaxFailures: s.maxFailures,
MaxFailuresRate: s.maxFailuresRate,
Verbose: s.verbose,
}, s.f1.GetScenarios(), s.build_trigger(), s.settings, s.metrics, outputer)
}, s.f1.GetScenarios(), s.build_trigger(), s.waitForCompletionTimeout, s.settings, s.metrics, outputer)

s.require.NoError(err)
s.runInstance = r
Expand All @@ -218,8 +225,11 @@ func (s *RunTestStage) the_run_command_is_executed_and_cancelled_after(duration
s.setupRun()

var err error
ctx, cancel := context.WithTimeout(context.TODO(), duration)
defer cancel()
ctx, cancel := context.WithCancel(context.TODO())
go func() {
<-time.After(duration)
cancel()
}()

s.runResult, err = s.runInstance.Do(ctx)
s.require.NoError(err)
Expand Down
62 changes: 40 additions & 22 deletions internal/run/test_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@ const (
)

type Run struct {
pusher *push.Pusher
progressRunner *raterun.Runner
metrics *metrics.Metrics
views *views.Views
activeScenario *workers.ActiveScenario
trigger *api.Trigger
output *ui.Output
scenarioLogger *ScenarioLogger
result *Result
options options.RunOptions
pusher *push.Pusher
progressRunner *raterun.Runner
metrics *metrics.Metrics
views *views.Views
activeScenario *workers.ActiveScenario
trigger *api.Trigger
output *ui.Output
scenarioLogger *ScenarioLogger
result *Result
options options.RunOptions
waitForCompletionTimeout time.Duration
}

func NewRun(
options options.RunOptions,
scenarios *scenarios.Scenarios,
trigger *api.Trigger,
waitForCompletionTimeout time.Duration,
settings envsettings.Settings,
metricsInstance *metrics.Metrics,
parentOutput *ui.Output,
Expand Down Expand Up @@ -91,16 +93,17 @@ func NewRun(
pusher := newMetricsPusher(settings, scenario.Name, metricsInstance)

return &Run{
options: options,
trigger: trigger,
metrics: metricsInstance,
views: viewsInstance,
result: result,
pusher: pusher,
output: outputer,
progressRunner: progressRunner,
activeScenario: activeScenario,
scenarioLogger: scenarioLogger,
options: options,
trigger: trigger,
metrics: metricsInstance,
views: viewsInstance,
result: result,
pusher: pusher,
output: outputer,
progressRunner: progressRunner,
activeScenario: activeScenario,
scenarioLogger: scenarioLogger,
waitForCompletionTimeout: waitForCompletionTimeout,
}, nil
}

Expand Down Expand Up @@ -254,12 +257,27 @@ func (r *Run) run(ctx context.Context) {
case <-ctx.Done():
r.output.Display(r.result.Interrupted())
r.progressRunner.Restart()
<-poolManager.WaitForCompletion()
select {
case <-poolManager.WaitForCompletion():
case <-time.After(r.waitForCompletionTimeout):
r.output.Display(ui.WarningMessage{
Message: fmt.Sprintf("Active tests not completed after %s. Stopping...", r.waitForCompletionTimeout.String()),
})
}

case <-triggerCtx.Done():
if triggerCtx.Err() == context.DeadlineExceeded {
r.output.Display(r.result.MaxDurationElapsed())
} else {
r.output.Display(r.result.Interrupted())
}
select {
case <-poolManager.WaitForCompletion():
case <-time.After(r.waitForCompletionTimeout):
r.output.Display(ui.WarningMessage{
Message: fmt.Sprintf("Active tests not completed after %s. Stopping...", r.waitForCompletionTimeout.String()),
})
}
<-poolManager.WaitForCompletion()
case <-poolManager.WaitForCompletion():
if poolManager.MaxIterationsReached() {
r.output.Display(r.result.MaxIterationsReached())
Expand Down
10 changes: 1 addition & 9 deletions internal/ui/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ func (t *Printer) Error(a ...any) {
fmt.Fprintln(t.ErrWriter, a...)
}

func (t *Printer) Printf(format string, a ...any) {
fmt.Fprintf(t.Writer, format, a...)
}

func (t *Printer) Warn(a ...any) {
fmt.Fprint(t.ErrWriter, a...)
}

func (t *Printer) Warnf(format string, a ...any) {
fmt.Fprintf(t.ErrWriter, format, a...)
fmt.Fprintln(t.ErrWriter, a...)
}

0 comments on commit cd26e01

Please sign in to comment.