-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexecutor.go
320 lines (262 loc) · 9.8 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
package executor
import (
"context"
"errors"
"log/slog"
"time"
"github.com/zalgonoise/cfg"
"github.com/zalgonoise/x/errs"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/zalgonoise/micron/schedule"
)
const (
cronAndLocAlloc = 2
defaultID = "micron.executor"
bufferPeriod = 100 * time.Millisecond
errDomain = errs.Domain("micron/executor")
ErrEmpty = errs.Kind("empty")
ErrRunnerList = errs.Entity("runners list")
ErrScheduler = errs.Entity("scheduler")
)
var (
ErrEmptyRunnerList = errs.WithDomain(errDomain, ErrEmpty, ErrRunnerList)
ErrEmptyScheduler = errs.WithDomain(errDomain, ErrEmpty, ErrScheduler)
)
// Runner describes a type that executes a job or task. It contains only one method, Run, that is called with a
// context as input and returns an error.
//
// Implementations of Runner only need to comply with this method, where the logic within Run is completely up to the
// actual implementation. These implementations need to be aware of the state of the input context.Context, which may
// denote cancellation or closure (e.g. with a timeout).
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
type Runner interface {
// Run executes the job or task.
//
// This call takes in a context.Context which may be used to denote cancellation or closure (e.g. with a timeout)
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
Run(ctx context.Context) error
}
// Runnable is a custom type for any function that takes in a context.Context and returns an error. This type of
// function can be perceived as a Runner type. For that, this custom type will implement Runner by exposing a Run method
// that invokes the actual Runnable function.
type Runnable func(ctx context.Context) error
// Run executes the job or task.
//
// This call takes in a context.Context which may be used to denote cancellation or closure (e.g. with a timeout)
//
// The returned error denotes the success state of the execution. A nil error means that the execution was successful,
// where a non-nil error must signal a failed execution.
func (r Runnable) Run(ctx context.Context) error {
if r == nil {
return nil
}
return r(ctx)
}
// Executor describes the capabilities of cron job's executor component, which is based on fetching the next execution's
// time, Next; as well as running the job, Exec. It also exposes an ID method to allow access to this Executor's
// configured ID or name.
//
// Implementations of Executor must focus on the logic of the Exec method, which should contain the logic of the Next
// method as well. It should not be the responsibility of other components to wait until it is time to execute the job;
// but actually the Executor's responsibility to consider it in its Exec method. That being said, its Next method (just
// like its ID method) allows access to some of the details of the executor if the caller needs that information; as
// helpers.
//
// The logic behind Next and generally calculating the time for the next job execution should be deferred to a
// schedule.Scheduler, which should be part of the Executor.
//
// One Executor may contain multiple Runner, as a job may be composed of several (smaller) tasks. However, an Executor
// is identified by a single ID.
type Executor interface {
// Exec runs the task when on its scheduled time.
//
// For this, Exec leverages the Executor's underlying schedule.Scheduler to retrieve the job's next execution time,
// waits for it, and calls Runner.Run on each configured Runner. All raised errors are joined and returned at the end
// of this call.
Exec(ctx context.Context) error
// Next calls the Executor's underlying schedule.Scheduler Next method.
Next(ctx context.Context) time.Time
// ID returns this Executor's ID.
ID() string
}
// Metrics describes the actions that register Executor-related metrics.
type Metrics interface {
// IncExecutorExecCalls increases the count of Exec calls, by the Executor.
IncExecutorExecCalls(id string)
// IncExecutorExecErrors increases the count of Exec call errors, by the Executor.
IncExecutorExecErrors(id string)
// ObserveExecLatency registers the duration of an Exec call, by the Executor.
ObserveExecLatency(ctx context.Context, id string, dur time.Duration)
// IncExecutorNextCalls increases the count of Next calls, by the Executor.
IncExecutorNextCalls(id string)
}
// Executable is an implementation of the Executor interface. It uses a schedule.Scheduler to mark the next job's
// execution time, and supports multiple Runner.
type Executable struct {
id string
cron schedule.Scheduler
runners []Runner
logger *slog.Logger
metrics Metrics
tracer trace.Tracer
}
// Next calls the Executor's underlying schedule.Scheduler Next method.
func (e *Executable) Next(ctx context.Context) time.Time {
ctx, span := e.tracer.Start(ctx, "Executor.Next")
defer span.End()
e.metrics.IncExecutorNextCalls(e.id)
next := e.cron.Next(ctx, time.Now())
e.logger.InfoContext(ctx, "next job",
slog.String("id", e.id),
slog.Time("at", next),
)
span.SetAttributes(
attribute.String("id", e.id),
attribute.String("at", next.Format(time.RFC3339)),
)
return next
}
// Exec runs the task when on its scheduled time.
//
// For this, Exec leverages the Executor's underlying schedule.Scheduler to retrieve the job's next execution time,
// waits for it, and calls Runner.Run on each configured Runner. All raised errors are joined and returned at the end
// of this call.
func (e *Executable) Exec(ctx context.Context) error {
ctx, span := e.tracer.Start(ctx, "Executor.Exec")
defer span.End()
span.SetAttributes(attribute.String("id", e.id))
e.metrics.IncExecutorExecCalls(e.id)
e.logger.InfoContext(ctx, "executing task", slog.String("id", e.id))
execCtx, cancel := context.WithCancel(ctx)
defer cancel()
start := time.Now()
defer func() {
e.metrics.ObserveExecLatency(ctx, e.id, time.Since(start))
}()
next := e.cron.Next(execCtx, start)
timer := time.NewTimer(next.Sub(start))
defer timer.Stop()
for {
select {
case <-ctx.Done():
err := ctx.Err()
e.metrics.IncExecutorExecErrors(e.id)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
e.logger.WarnContext(ctx, "task cancelled",
slog.String("id", e.id),
slog.String("error", err.Error()),
)
return err
case <-timer.C:
// avoid executing before it's time, as it may trigger repeated runs
if preTriggerDuration := time.Since(next); preTriggerDuration > 0 {
time.Sleep(preTriggerDuration + bufferPeriod)
}
runnerErrs := make([]error, 0, len(e.runners))
for i := range e.runners {
if err := e.runners[i].Run(ctx); err != nil {
runnerErrs = append(runnerErrs, err)
}
}
if len(runnerErrs) > 0 {
err := errors.Join(runnerErrs...)
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
e.metrics.IncExecutorExecErrors(e.id)
e.logger.ErrorContext(ctx, "task execution error(s)",
slog.String("id", e.id),
slog.Int("num_errors", len(runnerErrs)),
slog.String("errors", err.Error()),
)
return err
}
return nil
}
}
}
// ID returns this Executor's ID.
func (e *Executable) ID() string {
return e.id
}
// New creates an Executor with the input cfg.Option(s), also returning an error if raised.
//
// The minimum requirements to create an Executor is to supply at least one Runner, be it an implementation of
// this interface or as a Runnable using the WithRunners option, as well as a schedule.Scheduler using the
// WithScheduler option -- alternatively, callers can simply pass a cron string directly using the WithSchedule option.
//
// If an ID is not supplied, then the default ID of `micron.executor` is set.
func New(id string, options ...cfg.Option[*Config]) (Executor, error) {
config := cfg.Set(defaultConfig(), options...)
return newExecutable(id, config)
}
func newExecutable(id string, config *Config) (Executor, error) {
// validate input
if id == "" {
id = defaultID
}
if len(config.runners) == 0 {
return noOpExecutor{}, ErrEmptyRunnerList
}
if config.scheduler == nil && config.cron == "" {
return noOpExecutor{}, ErrEmptyScheduler
}
var sched schedule.Scheduler
switch {
case config.scheduler != nil:
// scheduler is provided, ignore cron string and location
sched = config.scheduler
default:
// create a new scheduler from config
opts := make([]cfg.Option[schedule.Config], 0, cronAndLocAlloc)
if config.cron != "" {
opts = append(opts, schedule.WithSchedule(config.cron))
}
if config.loc != nil {
opts = append(opts, schedule.WithLocation(config.loc))
}
var err error
sched, err = schedule.New(opts...)
if err != nil {
return noOpExecutor{}, err
}
}
// return the object with the provided runners
return &Executable{
id: id,
cron: sched,
runners: config.runners,
logger: slog.New(config.handler),
metrics: config.metrics,
tracer: config.tracer,
}, nil
}
// NoOp returns a no-op Executor.
func NoOp() Executor {
return noOpExecutor{}
}
type noOpExecutor struct{}
// Exec runs the task when on its scheduled time.
//
// This is a no-op call, it has no effect and the returned error is always nil.
func (e noOpExecutor) Exec(context.Context) error {
return nil
}
// Next calls the Executor's underlying schedule.Scheduler Next method.
//
// This is a no-op call, it has no effect and the returned time is always zero.
func (e noOpExecutor) Next(_ context.Context) (t time.Time) {
return t
}
// ID returns this Executor's ID.
//
// This is a no-op call, it has no effect and the returned string is always empty.
func (e noOpExecutor) ID() string {
return ""
}