-
Notifications
You must be signed in to change notification settings - Fork 8
/
run.go
111 lines (96 loc) · 2.42 KB
/
run.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package reflex
import (
"context"
"io"
"time"
"github.com/luno/jettison/errors"
"github.com/luno/jettison/j"
"github.com/luno/jettison/log"
)
// Run executes the spec by streaming events from the current cursor,
// feeding each into the consumer and updating the cursor on success.
// It always returns a non-nil error. Cancel the context to return early.
func Run(in context.Context, s Spec) error {
ctx, cancel := context.WithCancel(in)
defer cancel()
defer func() { _ = s.cStore.Flush(context.Background()) }() // best effort flush with new context
cursor, err := s.cStore.GetCursor(ctx, s.consumer.Name())
if err != nil {
return errors.Wrap(err, "get cursor error")
}
// Check if the consumer requires to be reset.
switch r := s.consumer.(type) {
case ResetterCtx:
err = r.Reset(ctx)
if err != nil {
return errors.Wrap(err, "reset error")
}
case resetter:
err = r.Reset()
if err != nil {
return errors.Wrap(err, "reset error")
}
}
// Filter out stream lag option since we implement lag here not at server.
var (
lag time.Duration
opts []StreamOption
)
for _, opt := range s.opts {
var temp StreamOptions
opt(&temp)
if temp.Lag > 0 {
lag = temp.Lag
} else {
opts = append(opts, opt)
}
}
// Start stream
sc, err := s.stream(ctx, cursor, opts...)
if err != nil {
return err
}
// Check if the stream client is a closer.
if closer, ok := sc.(io.Closer); ok {
defer func() { _ = closer.Close() }()
}
for {
e, err := sc.Recv()
if err != nil {
return errors.Wrap(err, "recv error")
}
ctx := log.ContextWith(ctx, j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
// Delay events if lag specified.
if delay := lag - since(e.Timestamp); lag > 0 && delay > 0 {
t := newTimer(delay)
select {
case <-ctx.Done():
t.Stop()
return ctx.Err()
case <-t.C:
}
}
if err := s.consumer.Consume(ctx, e); err != nil {
return errors.Wrap(err, "consume error", j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
}
if err := s.cStore.SetCursor(ctx, s.consumer.Name(), e.ID); err != nil {
return errors.Wrap(err, "set cursor error", j.MKS{
"consumer": s.consumer.Name(),
"event_id": e.ID,
"event_fid": e.ForeignID,
})
}
}
}
// newTimer is aliased for testing.
var newTimer = time.NewTimer
// since is aliased for testing.
var since = time.Since