diff --git a/adapter.go b/adapter.go new file mode 100644 index 0000000..e7e8523 --- /dev/null +++ b/adapter.go @@ -0,0 +1,30 @@ +// Copyright 2024 Outreach Corporation. All Rights Reserved. + +// Description: This file contains a compatibility layer with https://github.com/getoutreach/gobox/blob/main/pkg/async/async.go +package plumber + +import ( + "context" + "io" +) + +// AsyncRunner provides a compatibility adapter with async.Runner interface +func AsyncRunner(runner interface { + Run(ctx context.Context) error +}) RunnerCloser { + type Closer interface { + Close(ctx context.Context) error + } + return GracefulRunner(func(ctx context.Context, ready ReadyFunc) error { + go ready() + return runner.Run(ctx) + }, func(ctx context.Context) error { + switch r := runner.(type) { + case Closer: + return r.Close(ctx) + case io.Closer: + return r.Close() + } + return nil + }) +} diff --git a/looper.go b/looper.go new file mode 100644 index 0000000..ab81c9f --- /dev/null +++ b/looper.go @@ -0,0 +1,40 @@ +// Copyright 2024 Outreach Corporation. All Rights Reserved. + +// Description: This file contains looper structs +package plumber + +import "context" + +// BaseLooper is a looper struct that can be used in composition as following example: +// +// type Looper struct { +// *plumber.BaseLooper +// } +// +// s := &Looper{} +// s.BaseLooper = plumber.NewBaseLooper(s.loop) +// +// func (s *Looper) loop(ctx context.Context, l *plumber.Loop) error { +// .... +// } +type BaseLooper struct { + runner RunnerCloser +} + +// Run executes runners workload. Pipelines are starting Run method in separated goroutine. +// Runner must report its readiness using given callback +func (l *BaseLooper) Run(ctx context.Context, ready ReadyFunc) error { + return l.runner.Run(ctx, ready) +} + +// Close method triggers graceful shutdown on the task. It should block till task is properly closed. +// When Close timeout is exceeded then given context is canceled. +func (l *BaseLooper) Close(ctx context.Context) error { + return l.runner.Close(ctx) +} + +func NewBaseLooper(looper func(ctx context.Context, loop *Loop) error) *BaseLooper { + return &BaseLooper{ + runner: Looper(looper), + } +} diff --git a/orchestration.go b/orchestration.go index 9b9b539..2a47090 100644 --- a/orchestration.go +++ b/orchestration.go @@ -158,18 +158,23 @@ func (l *Loop) Closing() <-chan DoneFunc { // }) func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { var ( - once sync.Once - l = &Loop{ + runOnce sync.Once + closeOnce sync.Once + returnedCh = make(chan struct{}, 1) + l = &Loop{ closeCh: make(chan DoneFunc, 1), } ) return &gracefulRunner{ run: func(ctx context.Context, ready ReadyFunc) error { var err error - once.Do(func() { + runOnce.Do(func() { l.ready = ready - defer close(l.closeCh) + defer closeOnce.Do(func() { + close(l.closeCh) + }) err = run(ctx, l) + close(returnedCh) }) return err }, @@ -181,13 +186,14 @@ func Looper(run func(ctx context.Context, loop *Loop) error) RunnerCloser { close(errCh) } ) - l.closeCh <- canceled // if hasn't been started, lets close it - once.Do(func() { - close(errCh) + closeOnce.Do(func() { + l.closeCh <- canceled close(l.closeCh) }) select { + case <-returnedCh: + return nil case <-ctx.Done(): return ctx.Err() case err := <-errCh: @@ -286,15 +292,14 @@ func (r *ParallelPipeline) Run(ctx context.Context, ready ReadyFunc) error { for _, runner := range r.runners { go func(runner RunnerCloser) { defer r.wg.Done() - if err := runner.Run(ctx, func() { + err := runner.Run(ctx, func() { // Signal that runner is ready readyCh <- struct{}{} - }); err != nil { - if r.options.ErrorSignaler != nil && !r.closing.Load() { - r.options.ErrorSignaler(err) - } - errs <- err + }) + if r.options.ErrorSignaler != nil && !r.closing.Load() { + r.options.ErrorSignaler(err) } + errs <- err }(runner) } @@ -357,55 +362,53 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error { wg sync.WaitGroup errs = make(ErrorCh, len(r.runners)) readyCh = make(chan struct{}, 1) - closeCh = make(chan struct{}) ) - wg.Add(len(r.runners)) - - drain := func(index int) { - for i := index; i < len(r.runners); i++ { - wg.Done() - } - } // started go routine + wg.Add(1) go func() { + defer wg.Done() var index = 0 + var errored atomic.Bool for { select { - case <-closeCh: - drain(index) - return - case <-readyCh: + case _, ok := <-readyCh: + // We are closed + if !ok { + return + } // when all runners are running we cal report that pipeline is ready if index == len(r.runners) { ready() return } // when we are closing we need to mark remaining workers as finished - if r.closing.Load() { - drain(index) + if r.closing.Load() || errored.Load() { return } runner := r.runners[index] index++ // runner go routine + wg.Add(1) go func() { var once sync.Once defer wg.Done() err := runner.Run(ctx, func() { // worker is ready we can start with next one once.Do(func() { - readyCh <- struct{}{} + if !r.closing.Load() && !errored.Load() { + readyCh <- struct{}{} + } }) }) if r.options.ErrorSignaler != nil && !r.closing.Load() { r.options.ErrorSignaler(err) } if err != nil { - r.closing.Store(true) - close(closeCh) + errored.Store(true) + close(readyCh) + errs <- err } - errs <- err }() case <-ctx.Done(): return @@ -414,7 +417,6 @@ func (r *SerialPipeline) Run(ctx context.Context, ready ReadyFunc) error { }() // Lets start first worker readyCh <- struct{}{} - wg.Wait() close(errs)