diff --git a/internal/run/run_cmd.go b/internal/run/run_cmd.go index 596d7ae8..9aadd689 100644 --- a/internal/run/run_cmd.go +++ b/internal/run/run_cmd.go @@ -16,6 +16,8 @@ import ( "github.com/form3tech-oss/f1/v2/pkg/f1/scenarios" ) +const waitForCompletionTimeout = 10 * time.Second + func Cmd( s *scenarios.Scenarios, builders []api.Builder, @@ -157,7 +159,7 @@ func runCmdExecute( MaxFailures: maxFailures, MaxFailuresRate: maxFailuresRate, IgnoreDropped: ignoreDropped, - }, s, trig, settings, metricsInstance, output) + }, s, trig, waitForCompletionTimeout, settings, metricsInstance, output) if err != nil { return fmt.Errorf("new run: %w", err) } diff --git a/internal/run/run_cmd_test.go b/internal/run/run_cmd_test.go index 34f5ff43..a83176d4 100644 --- a/internal/run/run_cmd_test.go +++ b/internal/run/run_cmd_test.go @@ -563,7 +563,68 @@ func TestInterruptedRun(t *testing.T) { then. setup_teardown_is_called_within(600 * time.Millisecond).and(). metrics_are_pushed_to_prometheus().and(). - there_is_a_metric_called("form3_loadtest_iteration") + there_is_a_metric_called("form3_loadtest_iteration").and(). + expect_the_stdout_output_to_include([]string{ + "Interrupted - waiting for active tests to complete", + }) +} + +func TestInterruptedRun_TimesOut(t *testing.T) { + t.Parallel() + + given, when, then := NewRunTestStage(t) + + given. + a_timer_is_started(). + a_rate_of("1/1s").and(). + a_duration_of(5 * time.Second).and(). + a_scenario_where_each_iteration_takes(3 * time.Second).and(). + wait_for_completion_timeout_of(1 * time.Second).and(). + a_distribution_type("none") + + when. + the_run_command_is_executed_and_cancelled_after(500 * time.Millisecond) + + then. + setup_teardown_is_called_within(600*time.Millisecond + 1*time.Second).and(). + metrics_are_pushed_to_prometheus().and(). + expect_the_stdout_output_to_include([]string{ + "Interrupted - waiting for active tests to complete", + "Active tests not completed after 1s. Stopping...", + }) + + // needed for goroutine checker + // sleep is the time left in the interation duration after the timeout + time.Sleep(3 * time.Second) +} + +func TestMaxDurationReached_TimesOut(t *testing.T) { + t.Parallel() + + given, when, then := NewRunTestStage(t) + + given. + a_timer_is_started(). + a_rate_of("1/1s").and(). + a_duration_of(500 * time.Millisecond).and(). + a_scenario_where_each_iteration_takes(3 * time.Second).and(). + wait_for_completion_timeout_of(1 * time.Second).and(). + a_distribution_type("none") + + when. + the_run_command_is_executed() + + then. + setup_teardown_is_called_within(600*time.Millisecond + 1*time.Second).and(). + metrics_are_pushed_to_prometheus().and(). + expect_the_stdout_output_to_include([]string{ + "Max Duration Elapsed - waiting for active tests to complete", + "Active tests not completed after 1s. Stopping...", + }) + + // needed for goroutine checker + // sleep is the time left in the interation duration after the timeout + time.Sleep(3 * time.Second) } func TestFinalRunMetrics(t *testing.T) { diff --git a/internal/run/run_stage_test.go b/internal/run/run_stage_test.go index bd4b2cdd..13b5d3aa 100644 --- a/internal/run/run_stage_test.go +++ b/internal/run/run_stage_test.go @@ -68,58 +68,60 @@ type ( ) type RunTestStage struct { - startTime time.Time - metrics *metrics.Metrics - output *ui.Output - runInstance *run.Run - runResult *run.Result - t *testing.T - require *require.Assertions - metricData *MetricData - scenarioCleanup func() - assert *assert.Assertions - iterationCleanup func() - f1 *f1.F1 - durations sync.Map - frequency string - rate string - stages string - distributionType string - configFile string - startRate string - endRate string - rampDuration string - scenario string - settings envsettings.Settings - maxFailures uint64 - maxIterations uint64 - maxFailuresRate int - duration time.Duration - concurrency int - triggerType TriggerType - iterationTeardownCount atomic.Uint32 - setupTeardownCount atomic.Uint32 - runCount atomic.Uint32 - stdout syncWriter - stderr syncWriter - interactive bool - verbose bool + startTime time.Time + metrics *metrics.Metrics + output *ui.Output + runInstance *run.Run + runResult *run.Result + t *testing.T + require *require.Assertions + metricData *MetricData + scenarioCleanup func() + assert *assert.Assertions + iterationCleanup func() + f1 *f1.F1 + durations sync.Map + frequency string + rate string + stages string + distributionType string + configFile string + startRate string + endRate string + rampDuration string + scenario string + settings envsettings.Settings + maxFailures uint64 + maxIterations uint64 + maxFailuresRate int + duration time.Duration + waitForCompletionTimeout time.Duration + concurrency int + triggerType TriggerType + iterationTeardownCount atomic.Uint32 + setupTeardownCount atomic.Uint32 + runCount atomic.Uint32 + stdout syncWriter + stderr syncWriter + interactive bool + verbose bool } func NewRunTestStage(t *testing.T) (*RunTestStage, *RunTestStage, *RunTestStage) { t.Helper() stage := &RunTestStage{ - t: t, - concurrency: 100, - assert: assert.New(t), - require: require.New(t), - f1: f1.New(), - settings: envsettings.Get(), - metricData: NewMetricData(), - output: ui.NewDiscardOutput(), - metrics: metrics.NewInstance(prometheus.NewRegistry(), true), - stdout: syncWriter{writer: &bytes.Buffer{}}, - stderr: syncWriter{writer: &bytes.Buffer{}}, + t: t, + concurrency: 100, + assert: assert.New(t), + require: require.New(t), + f1: f1.New(), + settings: envsettings.Get(), + metricData: NewMetricData(), + output: ui.NewDiscardOutput(), + metrics: metrics.NewInstance(prometheus.NewRegistry(), true), + stdout: syncWriter{writer: &bytes.Buffer{}}, + stderr: syncWriter{writer: &bytes.Buffer{}}, + waitForCompletionTimeout: 5 * time.Second, } handler := FakePrometheusHandler(t, stage.metricData) @@ -141,6 +143,11 @@ func (s *RunTestStage) a_rate_of(rate string) *RunTestStage { return s } +func (s *RunTestStage) wait_for_completion_timeout_of(timeout time.Duration) *RunTestStage { + s.waitForCompletionTimeout = timeout + return s +} + func (s *RunTestStage) and() *RunTestStage { return s } @@ -198,7 +205,7 @@ func (s *RunTestStage) setupRun() { MaxFailures: s.maxFailures, MaxFailuresRate: s.maxFailuresRate, Verbose: s.verbose, - }, s.f1.GetScenarios(), s.build_trigger(), s.settings, s.metrics, outputer) + }, s.f1.GetScenarios(), s.build_trigger(), s.waitForCompletionTimeout, s.settings, s.metrics, outputer) s.require.NoError(err) s.runInstance = r @@ -218,8 +225,11 @@ func (s *RunTestStage) the_run_command_is_executed_and_cancelled_after(duration s.setupRun() var err error - ctx, cancel := context.WithTimeout(context.TODO(), duration) - defer cancel() + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + <-time.After(duration) + cancel() + }() s.runResult, err = s.runInstance.Do(ctx) s.require.NoError(err) diff --git a/internal/run/test_runner.go b/internal/run/test_runner.go index d72c3131..e8a9b9ea 100644 --- a/internal/run/test_runner.go +++ b/internal/run/test_runner.go @@ -30,22 +30,24 @@ const ( ) type Run struct { - pusher *push.Pusher - progressRunner *raterun.Runner - metrics *metrics.Metrics - views *views.Views - activeScenario *workers.ActiveScenario - trigger *api.Trigger - output *ui.Output - scenarioLogger *ScenarioLogger - result *Result - options options.RunOptions + pusher *push.Pusher + progressRunner *raterun.Runner + metrics *metrics.Metrics + views *views.Views + activeScenario *workers.ActiveScenario + trigger *api.Trigger + output *ui.Output + scenarioLogger *ScenarioLogger + result *Result + options options.RunOptions + waitForCompletionTimeout time.Duration } func NewRun( options options.RunOptions, scenarios *scenarios.Scenarios, trigger *api.Trigger, + waitForCompletionTimeout time.Duration, settings envsettings.Settings, metricsInstance *metrics.Metrics, parentOutput *ui.Output, @@ -91,16 +93,17 @@ func NewRun( pusher := newMetricsPusher(settings, scenario.Name, metricsInstance) return &Run{ - options: options, - trigger: trigger, - metrics: metricsInstance, - views: viewsInstance, - result: result, - pusher: pusher, - output: outputer, - progressRunner: progressRunner, - activeScenario: activeScenario, - scenarioLogger: scenarioLogger, + options: options, + trigger: trigger, + metrics: metricsInstance, + views: viewsInstance, + result: result, + pusher: pusher, + output: outputer, + progressRunner: progressRunner, + activeScenario: activeScenario, + scenarioLogger: scenarioLogger, + waitForCompletionTimeout: waitForCompletionTimeout, }, nil } @@ -254,12 +257,27 @@ func (r *Run) run(ctx context.Context) { case <-ctx.Done(): r.output.Display(r.result.Interrupted()) r.progressRunner.Restart() - <-poolManager.WaitForCompletion() + select { + case <-poolManager.WaitForCompletion(): + case <-time.After(r.waitForCompletionTimeout): + r.output.Display(ui.WarningMessage{ + Message: fmt.Sprintf("Active tests not completed after %s. Stopping...", r.waitForCompletionTimeout.String()), + }) + } + case <-triggerCtx.Done(): if triggerCtx.Err() == context.DeadlineExceeded { r.output.Display(r.result.MaxDurationElapsed()) + } else { + r.output.Display(r.result.Interrupted()) + } + select { + case <-poolManager.WaitForCompletion(): + case <-time.After(r.waitForCompletionTimeout): + r.output.Display(ui.WarningMessage{ + Message: fmt.Sprintf("Active tests not completed after %s. Stopping...", r.waitForCompletionTimeout.String()), + }) } - <-poolManager.WaitForCompletion() case <-poolManager.WaitForCompletion(): if poolManager.MaxIterationsReached() { r.output.Display(r.result.MaxIterationsReached()) diff --git a/internal/ui/printer.go b/internal/ui/printer.go index 0cf8f76a..d819e133 100644 --- a/internal/ui/printer.go +++ b/internal/ui/printer.go @@ -34,14 +34,6 @@ func (t *Printer) Error(a ...any) { fmt.Fprintln(t.ErrWriter, a...) } -func (t *Printer) Printf(format string, a ...any) { - fmt.Fprintf(t.Writer, format, a...) -} - func (t *Printer) Warn(a ...any) { - fmt.Fprint(t.ErrWriter, a...) -} - -func (t *Printer) Warnf(format string, a ...any) { - fmt.Fprintf(t.ErrWriter, format, a...) + fmt.Fprintln(t.ErrWriter, a...) }