Skip to content

Fix cancel states #3849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ func (c *client) Next(ctx context.Context, f rpc.Filter) (*rpc.Workflow, error)
}

// Wait blocks until the workflow is complete.
func (c *client) Wait(ctx context.Context, id string) (err error) {
func (c *client) Wait(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.WaitRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Wait(ctx, req)
if err == nil {
Expand Down Expand Up @@ -243,10 +243,10 @@ func (c *client) Done(ctx context.Context, workflowID string, state rpc.Workflow
}

// Extend extends the workflow deadline.
func (c *client) Extend(ctx context.Context, id string) (err error) {
func (c *client) Extend(ctx context.Context, workflowID string) (err error) {
retry := c.newBackOff()
req := new(proto.ExtendRequest)
req.Id = id
req.Id = workflowID
for {
_, err = c.client.Extend(ctx, req)
if err == nil {
Expand Down Expand Up @@ -277,10 +277,10 @@ func (c *client) Extend(ctx context.Context, id string) (err error) {
}

// Update updates the workflow state.
func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (err error) {
func (c *client) Update(ctx context.Context, workflowID string, state rpc.StepState) (err error) {
retry := c.newBackOff()
req := new(proto.UpdateRequest)
req.Id = id
req.Id = workflowID
req.State = new(proto.StepState)
req.State.StepUuid = state.StepUUID
req.State.Started = state.Started
Expand Down Expand Up @@ -317,7 +317,7 @@ func (c *client) Update(ctx context.Context, id string, state rpc.StepState) (er
return nil
}

// Log writes the workflow log entry.
// Log writes the step log entry.
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
retry := c.newBackOff()
req := new(proto.LogRequest)
Expand Down
10 changes: 4 additions & 6 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
if err := r.client.Wait(workflowCtx, workflow.ID); err != nil {
canceled = true
logger.Warn().Err(err).Msg("cancel signal received")

cancel()
} else {
logger.Debug().Msg("done listening for cancel signal")
Expand All @@ -117,11 +116,10 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
select {
case <-workflowCtx.Done():
logger.Debug().Msg("pipeline done")

return

case <-time.After(time.Minute):
logger.Debug().Msg("pipeline lease renewed")

if err := r.client.Extend(workflowCtx, workflow.ID); err != nil {
log.Error().Err(err).Msg("extending pipeline deadline failed")
}
Expand All @@ -144,7 +142,7 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
pipeline.WithContext(workflowCtx),
pipeline.WithTaskUUID(fmt.Sprint(workflow.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, logger, workflow)),
pipeline.WithTracer(r.createTracer(ctxMeta, &uploads, logger, workflow)),
pipeline.WithBackend(*r.backend),
pipeline.WithDescription(map[string]string{
"workflow_id": workflow.ID,
Expand All @@ -170,9 +168,9 @@ func (r *Runner) Run(runnerCtx context.Context) error { //nolint:contextcheck
Bool("canceled", canceled).
Msg("workflow finished")

logger.Debug().Msg("uploading logs ...")
logger.Debug().Msg("uploading logs and traces / states ...")
uploads.Wait()
logger.Debug().Msg("uploaded logs")
logger.Debug().Msg("uploaded logs and traces / states")

logger.Debug().
Str("error", state.Error).
Expand Down
93 changes: 93 additions & 0 deletions agent/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package agent

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/dummy"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/backend/types"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc/mocks"
)

type peery struct {
*mocks.Peer
}

func (p *peery) Done(ctx context.Context, id string, state rpc.WorkflowState) error {
return nil
}

func TestRunnerCanceledState(t *testing.T) {
backend := dummy.New()
_peer := mocks.NewPeer(t)

peer := &peery{_peer}

hostname := "dummy"
filter := rpc.Filter{
Labels: map[string]string{
"hostname": hostname,
"platform": "test",
"backend": backend.Name(),
"repo": "*", // allow all repos by default
},
}
state := &State{
Metadata: map[string]Info{},
Polling: 1, // max workflows to poll
Running: 0,
}
r := NewRunner(peer, filter, hostname, state, &backend)
ctx, cancel := context.WithCancel(context.Background())

workflow := &rpc.Workflow{
ID: "1",
Config: &types.Config{
Stages: []*types.Stage{
{
Steps: []*types.Step{
{

Name: "test",
Environment: map[string]string{
"SLEEP": "10s",
},
Commands: []string{
"echo 'hello world'",
},
OnSuccess: true,
},
},
},
},
},
Timeout: 1, // 1 minute
}

peer.On("Next", mock.Anything, filter).Return(workflow, nil).Once()
peer.On("Init", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool {
return state.Started != 0 && state.Finished == 0 && state.Error == ""
})).Return(nil)
peer.On("Done", mock.Anything, "1", mock.MatchedBy(func(state rpc.WorkflowState) bool {
return state.Started != 0 && state.Finished != 0 && state.Error == ""
})).Return(nil)
peer.On("Log", mock.Anything, mock.Anything).Return(nil)
peer.On("Wait", mock.Anything, "1").Return(nil)
peer.On("Update", mock.Anything, "1", mock.Anything).Return(nil)
peer.On("Extend", mock.Anything, "1").Return(nil).Maybe()

go func() {
time.Sleep(100 * time.Millisecond)
fmt.Println("canceling ...")
cancel()
}()

err := r.Run(ctx)
assert.NoError(t, err)
}
16 changes: 11 additions & 5 deletions agent/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"runtime"
"strconv"
"sync"
"time"

"github.com/rs/zerolog"
Expand All @@ -26,22 +27,26 @@ import (
"go.woodpecker-ci.org/woodpecker/v2/pipeline/rpc"
)

func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
func (r *Runner) createTracer(ctxMeta context.Context, uploads *sync.WaitGroup, logger zerolog.Logger, workflow *rpc.Workflow) pipeline.TraceFunc {
return func(state *pipeline.State) error {
uploads.Add(1)

stepLogger := logger.With().
Str("image", state.Pipeline.Step.Image).
Str("workflowID", workflow.ID).
Str("workflow_id", workflow.ID).
Err(state.Process.Error).
Int("exit_code", state.Process.ExitCode).
Bool("exited", state.Process.Exited).
Logger()

stepState := rpc.StepState{
StepUUID: state.Pipeline.Step.UUID,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO: do not do this
Finished: time.Now().Unix(),
}
if !state.Process.Exited {
stepState.Started = time.Now().Unix() // TODO: do not do this (UpdateStepStatus currently takes care that this is not overwritten)
} else {
stepState.Finished = time.Now().Unix()
}
if state.Process.Error != nil {
stepState.Error = state.Process.Error.Error()
Expand All @@ -57,6 +62,7 @@ func (r *Runner) createTracer(ctxMeta context.Context, logger zerolog.Logger, wo
}

stepLogger.Debug().Msg("update step status complete")
uploads.Done()
}()
if state.Process.Exited {
return nil
Expand Down
4 changes: 2 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
return nil, nil
}

// Some pipeline backends, such as local, will close the pipe from Tail on Wait,
// so first make sure all reading has finished.
// We wait until all data was logged. (Needed for some backends like local as WaitStep kills the log stream)
wg.Wait()

waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down
27 changes: 21 additions & 6 deletions server/grpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,14 @@ func (s *RPC) Update(_ context.Context, strWorkflowID string, state rpc.StepStat
return err
}

if err := pipeline.UpdateStepStatus(s.store, step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
if state.Finished == 0 {
if _, err := pipeline.UpdateStepStatusToRunning(s.store, *step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
}
} else {
if _, err := pipeline.UpdateStepStatusToDone(s.store, *step, state); err != nil {
log.Error().Err(err).Msg("rpc.update: cannot update step")
}
}

if currentPipeline.Workflows, err = s.store.WorkflowGetTree(currentPipeline); err != nil {
Expand Down Expand Up @@ -204,10 +210,19 @@ func (s *RPC) Init(c context.Context, strWorkflowID string, state rpc.WorkflowSt
return err
}

if currentPipeline.Status == model.StatusPending {
if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
}
// Init should only be called on pending pipelines
if currentPipeline.Status != model.StatusPending {
log.Error().Msgf("pipeline %d is not pending", currentPipeline.ID)
return errors.New("pipeline is not pending")
}

if currentPipeline, err = pipeline.UpdateToStatusRunning(s.store, *currentPipeline, state.Started); err != nil {
log.Error().Err(err).Msgf("init: cannot update pipeline %d state", currentPipeline.ID)
}

workflow, err = pipeline.UpdateWorkflowToStatusRunning(s.store, *workflow, state)
if err != nil {
return err
}

s.updateForgeStatus(c, repo, currentPipeline, workflow)
Expand Down
2 changes: 1 addition & 1 deletion server/pipeline/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Cancel(ctx context.Context, _forge forge.Forge, store store.Store, repo *mo
}
for _, step := range workflow.Children {
if step.State == model.StatusPending {
if _, err = UpdateStepToStatusSkipped(store, *step, 0); err != nil {
if _, err = UpdateStepStatusToSkipped(store, *step, 0); err != nil {
log.Error().Err(err).Msgf("cannot update workflow with id %d state", workflow.ID)
}
}
Expand Down
7 changes: 3 additions & 4 deletions server/pipeline/step_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,11 @@ func UpdateStepStatusToDone(store store.Store, step model.Step, state rpc.StepSt
step.Finished = state.Finished
step.Error = state.Error
step.ExitCode = state.ExitCode
step.State = model.StatusSuccess
if state.Started == 0 {
step.State = model.StatusSkipped
} else {
step.State = model.StatusSuccess
}
if step.ExitCode != 0 || step.Error != "" {
if state.ExitCode != 0 || state.Error != "" {
step.State = model.StatusFailure
}
return &step, store.StepUpdate(&step)
Expand All @@ -79,6 +78,6 @@ func UpdateStepToStatusKilled(store store.Store, step model.Step) (*model.Step,
if step.Started == 0 {
step.Started = step.Finished
}
step.ExitCode = pipeline.ExitCodeKilled

return &step, store.StepUpdate(&step)
}
Loading