Skip to content

improve worker shutdown logic #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- Make WaitForOrchestrationXXX gRPC APIs resilient ([#80](https://github.com/microsoft/durabletask-go/pull/81)) - by [@famarting](https://github.com/famarting)
- Make WaitForOrchestrationXXX gRPC APIs resilient ([#80](https://github.com/microsoft/durabletask-go/pull/80)) - by [@famarting](https://github.com/famarting)
- Improve worker shutdown logic ([#77](https://github.com/microsoft/durabletask-go/pull/77)) - by [@famarting](https://github.com/famarting)

## [v0.5.0] - 2024-06-28

Expand Down
20 changes: 18 additions & 2 deletions backend/taskhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"context"
"sync"
)

type TaskHubWorker interface {
Expand Down Expand Up @@ -50,7 +51,22 @@ func (w *taskHubWorker) Shutdown(ctx context.Context) error {
}

w.logger.Info("workers stopping and draining...")
w.orchestrationWorker.StopAndDrain()
w.activityWorker.StopAndDrain()
defer w.logger.Info("finished stopping and draining workers!")

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
w.orchestrationWorker.StopAndDrain()
}()

wg.Add(1)
go func() {
defer wg.Done()
w.activityWorker.StopAndDrain()
}()

wg.Wait()

return nil
}
28 changes: 27 additions & 1 deletion backend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"sync"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -45,6 +46,7 @@ type worker struct {
cancel context.CancelFunc
processor TaskProcessor
waiting bool
stop atomic.Bool
}

type NewTaskWorkerOptions func(*WorkerOptions)
Expand Down Expand Up @@ -89,6 +91,8 @@ func (w *worker) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
w.cancel = cancel

w.stop.Store(false)

go func() {
var b backoff.BackOff = &backoff.ExponentialBackOff{
InitialInterval: 50 * time.Millisecond,
Expand Down Expand Up @@ -190,6 +194,11 @@ func (w *worker) ProcessNext(ctx context.Context) (bool, error) {
}

func (w *worker) StopAndDrain() {
w.logger.Debugf("%v: stop and drain...", w.Name())
defer w.logger.Debugf("%v: finished stop and drain...", w.Name())

w.stop.Store(true)

// Cancel the background poller and dispatcher(s)
if w.cancel != nil {
w.cancel()
Expand All @@ -206,20 +215,37 @@ func (w *worker) processWorkItem(ctx context.Context, wi WorkItem) {

w.logger.Debugf("%v: processing work item: %s", w.Name(), wi)

if w.stop.Load() {
if err := w.processor.AbandonWorkItem(context.Background(), wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
return
}

if err := w.processor.ProcessWorkItem(ctx, wi); err != nil {
if errors.Is(err, ctx.Err()) {
w.logger.Warnf("%v: abandoning work item due to cancellation", w.Name())
} else {
w.logger.Errorf("%v: failed to process work item: %v", w.Name(), err)
}
if w.stop.Load() {
ctx = context.Background()
}
if err := w.processor.AbandonWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
return
}

if err := w.processor.CompleteWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to complete work item: %v", w.Name(), err)
if errors.Is(err, ctx.Err()) {
w.logger.Warnf("%v: failed to complete work item due to cancellation", w.Name())
} else {
w.logger.Errorf("%v: failed to complete work item: %v", w.Name(), err)
}
if w.stop.Load() {
ctx = context.Background()
}
if err := w.processor.AbandonWorkItem(ctx, wi); err != nil {
w.logger.Errorf("%v: failed to abandon work item: %v", w.Name(), err)
}
Expand Down
128 changes: 128 additions & 0 deletions tests/mocks/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package mocks

import (
context "context"
"errors"
"sync"
"sync/atomic"
"time"

backend "github.com/microsoft/durabletask-go/backend"
)

var _ backend.TaskProcessor = &TestTaskProcessor{}

// TestTaskProcessor implements a dummy task processor useful for testing
type TestTaskProcessor struct {
name string

processingBlocked atomic.Bool

workItemMu sync.Mutex
workItems []backend.WorkItem

abandonedWorkItemMu sync.Mutex
abandonedWorkItems []backend.WorkItem

completedWorkItemMu sync.Mutex
completedWorkItems []backend.WorkItem
}

func NewTestTaskPocessor(name string) *TestTaskProcessor {
return &TestTaskProcessor{
name: name,
}
}

func (t *TestTaskProcessor) BlockProcessing() {
t.processingBlocked.Store(true)
}

func (t *TestTaskProcessor) UnblockProcessing() {
t.processingBlocked.Store(false)
}

func (t *TestTaskProcessor) PendingWorkItems() []backend.WorkItem {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.workItems...)
}

func (t *TestTaskProcessor) AbandonedWorkItems() []backend.WorkItem {
t.abandonedWorkItemMu.Lock()
defer t.abandonedWorkItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.abandonedWorkItems...)
}

func (t *TestTaskProcessor) CompletedWorkItems() []backend.WorkItem {
t.completedWorkItemMu.Lock()
defer t.completedWorkItemMu.Unlock()

// copy array
return append([]backend.WorkItem{}, t.completedWorkItems...)
}

func (t *TestTaskProcessor) AddWorkItems(wis ...backend.WorkItem) {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

t.workItems = append(t.workItems, wis...)
}

func (t *TestTaskProcessor) Name() string {
return t.name
}

func (t *TestTaskProcessor) FetchWorkItem(context.Context) (backend.WorkItem, error) {
t.workItemMu.Lock()
defer t.workItemMu.Unlock()

if len(t.workItems) == 0 {
return nil, backend.ErrNoWorkItems
}

// pop first item
i := 0
wi := t.workItems[i]
t.workItems = append(t.workItems[:i], t.workItems[i+1:]...)

return wi, nil
}

func (t *TestTaskProcessor) ProcessWorkItem(ctx context.Context, wi backend.WorkItem) error {
if !t.processingBlocked.Load() {
return nil
}
// wait for context cancellation or until processing is unblocked
for {
select {
case <-ctx.Done():
return errors.New("dummy error processing work item")
default:
if !t.processingBlocked.Load() {
return nil
}
time.Sleep(time.Millisecond)
}
}
}

func (t *TestTaskProcessor) AbandonWorkItem(ctx context.Context, wi backend.WorkItem) error {
t.abandonedWorkItemMu.Lock()
defer t.abandonedWorkItemMu.Unlock()

t.abandonedWorkItems = append(t.abandonedWorkItems, wi)
return nil
}

func (t *TestTaskProcessor) CompleteWorkItem(ctx context.Context, wi backend.WorkItem) error {
t.completedWorkItemMu.Lock()
defer t.completedWorkItemMu.Unlock()

t.completedWorkItems = append(t.completedWorkItems, wi)
return nil
}
Loading
Loading