-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlooper.go
142 lines (129 loc) · 3.28 KB
/
looper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2024 Outreach Corporation. All Rights Reserved.
// Description: This file contains looper structs
package plumber
import (
"context"
"sync"
)
// BaseLooper is a looper struct that can be used in composition as following example:
//
// type Looper struct {
// *plumber.BaseLooper
// }
//
// s := &Looper{}
// s.BaseLooper = plumber.NewBaseLooper(s.loop)
//
// func (s *Looper) loop(ctx context.Context, l *plumber.Loop) error {
// ....
// }
type BaseLooper struct {
runner Runner
}
// Run executes runners workload. Pipelines are starting Run method in separated goroutine.
// Runner must report its readiness using given callback
func (l *BaseLooper) Run(ctx context.Context) error {
return l.runner.Run(ctx)
}
// Close method triggers graceful shutdown on the task. It should block till task is properly closed.
// When Close timeout is exceeded then given context is canceled.
func (l *BaseLooper) Close(ctx context.Context) error {
return RunnerClose(ctx, l.runner)
}
// Ready signals that runner is ready
func (l *BaseLooper) Ready() (<-chan struct{}, error) {
return RunnerReady(l.runner)
}
func NewBaseLooper(looper func(ctx context.Context, loop *Loop) error) *BaseLooper {
return &BaseLooper{
runner: Looper(looper),
}
}
// Loop is a looper controlling struct
type Loop struct {
closeCh chan DoneFunc
ready ReadyFunc
}
// Ready reports runner's readiness
func (l *Loop) Ready() {
if l.ready != nil {
l.ready()
}
}
// Closing returns a channel that's closed when cancellation is requested
func (l *Loop) Closing() <-chan DoneFunc {
return l.closeCh
}
// Looper creates a runner that runs in the detached go routine in indefinite loop.
// Provided Loop struct allows to detect a cancellation and graceful termination
// Example:
//
// plumber.Looper(func(ctx context.Context, l *plumber.Loop) error {
// l.Ready()
// tick := time.Tick(500 * time.Millisecond)
// for {
// select {
// case <-tick:
// // Work
// fmt.Println("Work")
// case closeDone := <-l.Closing():
// fmt.Println("Close is requested")
// closeDone.Success()
// // Graceful shutdown
// return nil
// case <-ctx.Done():
// // Cancel / Timeout
// return ctx.Err()
// }
// }
// })
func Looper(run func(ctx context.Context, loop *Loop) error) Runner {
var (
runOnce sync.Once
closeOnce sync.Once
returnedCh = make(chan struct{}, 1)
l = &Loop{
closeCh: make(chan DoneFunc, 1),
}
)
signal := NewSignal()
return NewRunner(
func(ctx context.Context) error {
var err error
runOnce.Do(func() {
l.ready = func() {
signal.Notify()
}
defer closeOnce.Do(func() {
close(l.closeCh)
})
err = run(ctx, l)
close(returnedCh)
})
return err
},
WithClose(func(ctx context.Context) error {
var (
errCh = make(chan error, 1)
canceled DoneFunc = func(err error) {
errCh <- err
close(errCh)
}
)
// if hasn't been started, lets close it
closeOnce.Do(func() {
l.closeCh <- canceled
close(l.closeCh)
})
select {
case <-returnedCh:
return nil
case <-ctx.Done():
return ctx.Err()
case err := <-errCh:
return err
}
}),
WithReady(signal),
)
}