diff --git a/agent/rpc/client_grpc.go b/agent/rpc/client_grpc.go index 4c42224b4bc..5fb747ac0c8 100644 --- a/agent/rpc/client_grpc.go +++ b/agent/rpc/client_grpc.go @@ -133,10 +133,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error) } // Wait blocks until the workflow is complete. -func (c *client) Wait(ctx context.Context, id string) (err error) { +func (c *client) Wait(ctx context.Context, workflowID string) (err error) { retry := c.newBackOff() req := new(proto.WaitRequest) - req.Id = id + req.Id = workflowID for { _, err = c.client.Wait(ctx, req) if err == nil { @@ -243,10 +243,10 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow } // Extend extends the workflow deadline. -func (c *client) Extend(ctx context.Context, id string) (err error) { +func (c *client) Extend(ctx context.Context, workflowID string) (err error) { retry := c.newBackOff() req := new(proto.ExtendRequest) - req.Id = id + req.Id = workflowID for { _, err = c.client.Extend(ctx, req) if err == nil { @@ -277,10 +277,10 @@ func (c *client) Extend(ctx context.Context, id string) (err error) { } // Update updates the workflow state. -func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) { +func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) { retry := c.newBackOff() req := new(proto.UpdateRequest) - req.Id = id + req.Id = workflowID req.State = new(proto.StepState) req.State.StepUuid = state.StepUUID req.State.Started = state.Started @@ -317,7 +317,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er return nil } -// Log writes the workflow log entry. +// Log writes the step log entry. func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) { retry := c.newBackOff() req := new(proto.LogRequest) diff --git a/agent/runner.go b/agent/runner.go index 5cf8be64d48..6608bd02538 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -105,7 +105,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck if err := r.client.Wait(workflowCtx, workflow.ID); err != nil { canceled = true logger.Warn().Err(err).Msg("cancel signal received") - cancel() } else { logger.Debug().Msg("done listening for cancel signal") @@ -117,11 +116,10 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck select { case <-workflowCtx.Done(): logger.Debug().Msg("pipeline done") - return + case <-time.After(time.Minute): logger.Debug().Msg("pipeline lease renewed") - if err := r.client.Extend(workflowCtx, workflow.ID); err != nil { log.Error().Err(err).Msg("extending pipeline deadline failed") } @@ -144,7 +142,7 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck pipeline.WithContext(workflowCtx), pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)), pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)), - pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)), + pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)), pipeline.WithBackend(*r.backend), pipeline.WithDescription(map[string]string{ "workflow_id": workflow.ID, @@ -170,9 +168,9 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck Bool("canceled", canceled). Msg("workflow finished") - logger.Debug().Msg("uploading logs ...") + logger.Debug().Msg("uploading logs and traces / states ...") uploads.Wait() - logger.Debug().Msg("uploaded logs") + logger.Debug().Msg("uploaded logs and traces / states") logger.Debug(). Str("error", state.Error). diff --git a/agent/runner_test.go b/agent/runner_test.go new file mode 100644 index 00000000000..020e80804c9 --- /dev/null +++ b/agent/runner_test.go @@ -0,0 +1,93 @@ +package agent + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/dummy" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" + "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/mocks" +) + +type peery struct { + *mocks.Peer +} + +func (p *peery) Done(ctx context.Context, id string, state rpc.WorkflowState) error { + return nil +} + +func TestRunnerCanceledState(t *testing.T) { + backend := dummy.New() + _peer := mocks.NewPeer(t) + + peer := &peery{_peer} + + hostname := "dummy" + filter := rpc.Filter{ + Labels: map[string]string{ + "hostname": hostname, + "platform": "test", + "backend": backend.Name(), + "repo": "*", // allow all repos by default + }, + } + state := &State{ + Metadata: map[string]Info{}, + Polling: 1, // max workflows to poll + Running: 0, + } + r := NewRunner(peer, filter, hostname, state, &backend) + ctx, cancel := context.WithCancel(context.Background()) + + workflow := &rpc.Workflow{ + ID: "1", + Config: &types.Config{ + Stages: []*types.Stage{ + { + Steps: []*types.Step{ + { + + Name: "test", + Environment: map[string]string{ + "SLEEP": "10s", + }, + Commands: []string{ + "echo 'hello world'", + }, + OnSuccess: true, + }, + }, + }, + }, + }, + Timeout: 1, // 1 minute + } + + peer.On("Next", mock.Anything, filter).Return(workflow, nil).Once() + peer.On("Init", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool { + return state.Started != 0 && state.Finished == 0 && state.Error == "" + })).Return(nil) + peer.On("Done", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool { + return state.Started != 0 && state.Finished != 0 && state.Error == "" + })).Return(nil) + peer.On("Log", mock.Anything, mock.Anything).Return(nil) + peer.On("Wait", mock.Anything, "1").Return(nil) + peer.On("Update", mock.Anything, "1", mock.Anything).Return(nil) + peer.On("Extend", mock.Anything, "1").Return(nil).Maybe() + + go func() { + time.Sleep(100 * time.Millisecond) + fmt.Println("canceling ...") + cancel() + }() + + err := r.Run(ctx) + assert.NoError(t, err) +} diff --git a/agent/tracer.go b/agent/tracer.go index 9f3588126c8..7bc97c3392f 100644 --- a/agent/tracer.go +++ b/agent/tracer.go @@ -18,6 +18,7 @@ import ( "context" "runtime" "strconv" + "sync" "time" "github.com/rs/zerolog" @@ -26,11 +27,13 @@ import ( "go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc" ) -func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { +func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc { return func(state *pipeline.State) error { + uploads.Add(1) + stepLogger := logger.With(). Str("image", state.Pipeline.Step.Image). - Str("workflowID", workflow.ID). + Str("workflow_id", workflow.ID). Err(state.Process.Error). Int("exit_code", state.Process.ExitCode). Bool("exited", state.Process.Exited). @@ -38,10 +41,12 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo stepState := rpc.StepState{ StepUUID: state.Pipeline.Step.UUID, - Exited: state.Process.Exited, ExitCode: state.Process.ExitCode, - Started: time.Now().Unix(), // TODO: do not do this - Finished: time.Now().Unix(), + } + if !state.Process.Exited { + stepState.Started = time.Now().Unix() // TODO: do not do this (UpdateStepStatus currently takes care that this is not overwritten) + } else { + stepState.Finished = time.Now().Unix() } if state.Process.Error != nil { stepState.Error = state.Process.Error.Error() @@ -57,6 +62,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo } stepLogger.Debug().Msg("update step status complete") + uploads.Done() }() if state.Process.Exited { return nil diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 1c23061da01..ada817b02b1 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -254,9 +254,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { return nil, nil } - // Some pipeline backends, such as local, will close the pipe from Tail on Wait, - // so first make sure all reading has finished. + // We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream) wg.Wait() + waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) if err != nil { if errors.Is(err, context.Canceled) { diff --git a/server/grpc/rpc.go b/server/grpc/rpc.go index 83d366c770f..7a193674f08 100644 --- a/server/grpc/rpc.go +++ b/server/grpc/rpc.go @@ -147,8 +147,14 @@ func (s *RPC) Update(_ context.Context, strWorkflowID string, state rpc.StepStat return err } - if err := pipeline.UpdateStepStatus(s.store, step, state); err != nil { - log.Error().Err(err).Msg("rpc.update: cannot update step") + if state.Finished == 0 { + if _, err := pipeline.UpdateStepStatusToRunning(s.store, *step, state); err != nil { + log.Error().Err(err).Msg("rpc.update: cannot update step") + } + } else { + if _, err := pipeline.UpdateStepStatusToDone(s.store, *step, state); err != nil { + log.Error().Err(err).Msg("rpc.update: cannot update step") + } } if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil { @@ -204,10 +210,19 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt return err } - if currentPipeline.Status == model.StatusPending { - if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil { - log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID) - } + // Init should only be called on pending pipelines + if currentPipeline.Status != model.StatusPending { + log.Error().Msgf("pipeline %d is not pending", currentPipeline.ID) + return errors.New("pipeline is not pending") + } + + if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil { + log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID) + } + + workflow, err = pipeline.UpdateWorkflowToStatusRunning(s.store, *workflow, state) + if err != nil { + return err } s.updateForgeStatus(c, repo, currentPipeline, workflow) diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go index 253e1d58670..b2c2c0e03b2 100644 --- a/server/pipeline/cancel.go +++ b/server/pipeline/cancel.go @@ -76,7 +76,7 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo } for _, step := range workflow.Children { if step.State == model.StatusPending { - if _, err = UpdateStepToStatusSkipped(store, *step, 0); err != nil { + if _, err = UpdateStepStatusToSkipped(store, *step, 0); err != nil { log.Error().Err(err).Msgf("cannot update workflow with id %d state", workflow.ID) } } diff --git a/server/pipeline/step_status.go b/server/pipeline/step_status.go index 8ee5b83bafc..d8f40fce260 100644 --- a/server/pipeline/step_status.go +++ b/server/pipeline/step_status.go @@ -62,12 +62,11 @@ func UpdateStepStatusToDone(store store.Store, step model.Step, state rpc.StepSt step.Finished = state.Finished step.Error = state.Error step.ExitCode = state.ExitCode + step.State = model.StatusSuccess if state.Started == 0 { step.State = model.StatusSkipped - } else { - step.State = model.StatusSuccess } - if step.ExitCode != 0 || step.Error != "" { + if state.ExitCode != 0 || state.Error != "" { step.State = model.StatusFailure } return &step, store.StepUpdate(&step) @@ -79,6 +78,6 @@ func UpdateStepToStatusKilled(store store.Store, step model.Step) (*model.Step, if step.Started == 0 { step.Started = step.Finished } - step.ExitCode = pipeline.ExitCodeKilled + return &step, store.StepUpdate(&step) } diff --git a/server/pipeline/step_status_test.go b/server/pipeline/step_status_test.go index c2e9564cfd6..2f9dab685c1 100644 --- a/server/pipeline/step_status_test.go +++ b/server/pipeline/step_status_test.go @@ -43,14 +43,13 @@ func TestUpdateStepStatusNotExited(t *testing.T) { // advertised step status state := rpc.StepState{ Started: int64(42), - Exited: false, // Dummy data Finished: int64(1), ExitCode: pipeline.ExitCodeKilled, Error: "not an error", } - err := UpdateStepStatus(mockStoreStep(t), step, state) + step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state) assert.NoError(t, err) assert.EqualValues(t, model.StatusRunning, step.State) assert.EqualValues(t, 42, step.Started) @@ -74,7 +73,7 @@ func TestUpdateStepStatusNotExitedButStopped(t *testing.T) { Error: "not an error", } - err := UpdateStepStatus(mockStoreStep(t), step, state) + step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state) assert.NoError(t, err) assert.EqualValues(t, model.StatusKilled, step.State) assert.EqualValues(t, 42, step.Started) @@ -92,13 +91,12 @@ func TestUpdateStepStatusExited(t *testing.T) { // advertised step status state := rpc.StepState{ Started: int64(42), - Exited: true, Finished: int64(34), ExitCode: pipeline.ExitCodeKilled, Error: "an error", } - err := UpdateStepStatus(mockStoreStep(t), step, state) + step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state) assert.NoError(t, err) assert.EqualValues(t, model.StatusKilled, step.State) assert.EqualValues(t, 42, step.Started) @@ -116,12 +114,11 @@ func TestUpdateStepStatusExitedButNot137(t *testing.T) { // advertised step status state := rpc.StepState{ Started: int64(42), - Exited: true, Finished: int64(34), Error: "an error", } - err := UpdateStepStatus(mockStoreStep(t), step, state) + step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state) assert.NoError(t, err) assert.EqualValues(t, model.StatusFailure, step.State) assert.EqualValues(t, 42, step.Started) @@ -136,13 +133,12 @@ func TestUpdateStepStatusExitedWithCode(t *testing.T) { // advertised step status state := rpc.StepState{ Started: int64(42), - Exited: true, Finished: int64(34), ExitCode: 1, Error: "an error", } step := &model.Step{} - err := UpdateStepStatus(mockStoreStep(t), step, state) + step, err := UpdateStepStatusToDone(mockStoreStep(t), *step, state) assert.NoError(t, err) assert.Equal(t, model.StatusFailure, step.State) @@ -162,7 +158,7 @@ func TestUpdateStepToStatusStarted(t *testing.T) { func TestUpdateStepToStatusSkipped(t *testing.T) { t.Parallel() - step, _ := UpdateStepToStatusSkipped(mockStoreStep(t), model.Step{}, int64(1)) + step, _ := UpdateStepStatusToSkipped(mockStoreStep(t), model.Step{}, int64(1)) assert.Equal(t, model.StatusSkipped, step.State) assert.EqualValues(t, 0, step.Finished) @@ -175,7 +171,7 @@ func TestUpdateStepToStatusSkippedButStarted(t *testing.T) { Started: int64(42), } - step, _ = UpdateStepToStatusSkipped(mockStoreStep(t), *step, int64(1)) + step, _ = UpdateStepStatusToSkipped(mockStoreStep(t), *step, int64(1)) assert.Equal(t, model.StatusSuccess, step.State) assert.EqualValues(t, 1, step.Finished)