Skip to content

Commit 0885992

Browse files
committed
feat: support overriding the Prev time from robfig/pull/446
1 parent 4b3d450 commit 0885992

File tree

2 files changed

+47
-7
lines changed

2 files changed

+47
-7
lines changed

cron.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ type Entry struct {
7474
// Valid returns true if this is not the zero entry.
7575
func (e Entry) Valid() bool { return e.ID != 0 }
7676

77+
// ScheduleFirst is used for the initial scheduling. If a Prev value has been
78+
// included with the Entry, it will be used in place of "now" to allow schedules
79+
// to be preserved across process restarts.
80+
func (e Entry) ScheduleFirst(now time.Time) time.Time {
81+
if !e.Prev.IsZero() {
82+
return e.Schedule.Next(e.Prev)
83+
} else {
84+
return e.Schedule.Next(now)
85+
}
86+
}
87+
7788
// byTime is a wrapper for sorting the entry array by time
7889
// (with zero time at the end).
7990
type byTime []*Entry
@@ -138,24 +149,24 @@ func (f FuncJob) Run() { f() }
138149
// AddFunc adds a func to the Cron to be run on the given schedule.
139150
// The spec is parsed using the time zone of this Cron instance as the default.
140151
// An opaque ID is returned that can be used to later remove it.
141-
func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
142-
return c.AddJob(spec, FuncJob(cmd))
152+
func (c *Cron) AddFunc(spec string, cmd func(), entryOpts ...EntryOption) (EntryID, error) {
153+
return c.AddJob(spec, FuncJob(cmd), entryOpts...)
143154
}
144155

145156
// AddJob adds a Job to the Cron to be run on the given schedule.
146157
// The spec is parsed using the time zone of this Cron instance as the default.
147158
// An opaque ID is returned that can be used to later remove it.
148-
func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
159+
func (c *Cron) AddJob(spec string, cmd Job, entryOpts ...EntryOption) (EntryID, error) {
149160
schedule, err := c.parser.Parse(spec)
150161
if err != nil {
151162
return 0, err
152163
}
153-
return c.Schedule(schedule, cmd), nil
164+
return c.Schedule(schedule, cmd, entryOpts...), nil
154165
}
155166

156167
// Schedule adds a Job to the Cron to be run on the given schedule.
157168
// The job is wrapped with the configured Chain.
158-
func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
169+
func (c *Cron) Schedule(schedule Schedule, cmd Job, entryOpts ...EntryOption) EntryID {
159170
c.runningMu.Lock()
160171
defer c.runningMu.Unlock()
161172
c.nextID++
@@ -165,6 +176,9 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
165176
WrappedJob: c.chain.Then(cmd),
166177
Job: cmd,
167178
}
179+
for _, fn := range entryOpts {
180+
fn(entry)
181+
}
168182
if !c.running {
169183
heap.Push(&c.entries, entry)
170184
} else {
@@ -173,6 +187,18 @@ func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
173187
return entry.ID
174188
}
175189

190+
// EntryOption is a hook which allows the Entry to be altered before being
191+
// committed internally.
192+
type EntryOption func(*Entry)
193+
194+
// WithPrev allows setting the Prev time to allow interval-based schedules to
195+
// preserve their timeline even in the face of process restarts.
196+
func WithPrev(prev time.Time) EntryOption {
197+
return func(e *Entry) {
198+
e.Prev = prev
199+
}
200+
}
201+
176202
// Entries returns a snapshot of the cron entries.
177203
func (c *Cron) Entries() []Entry {
178204
c.runningMu.Lock()
@@ -244,7 +270,7 @@ func (c *Cron) run() {
244270
sortedEntries := new(EntryHeap)
245271
for len(c.entries) > 0 {
246272
entry := heap.Pop(&c.entries).(*Entry)
247-
entry.Next = entry.Schedule.Next(now)
273+
entry.Next = entry.ScheduleFirst(now)
248274
heap.Push(sortedEntries, entry)
249275
c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
250276
}
@@ -287,7 +313,7 @@ func (c *Cron) run() {
287313
case newEntry := <-c.add:
288314
timer.Stop()
289315
now = c.now()
290-
newEntry.Next = newEntry.Schedule.Next(now)
316+
newEntry.Next = newEntry.ScheduleFirst(now)
291317
heap.Push(&c.entries, newEntry)
292318
c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
293319

cron_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,20 @@ func TestStopAndWait(t *testing.T) {
676676
})
677677
}
678678

679+
func TestJobWithCustomPrev(t *testing.T) {
680+
cron := New()
681+
var calls int64
682+
// running every 3s, but starting 2s in the past
683+
// expected timeline: 1s ... 4s ... stop (2 calls)
684+
// if prev was ignored, the func would only be called once (at 3s)
685+
cron.AddFunc("@every 3s", func() { atomic.AddInt64(&calls, 1) }, WithPrev(time.Now().Add(-2*time.Second)))
686+
cron.Start()
687+
time.Sleep(5 * time.Second)
688+
if atomic.LoadInt64(&calls) != 2 {
689+
t.Errorf("called %d times, expected 2\n", calls)
690+
}
691+
}
692+
679693
func TestMultiThreadedStartAndStop(t *testing.T) {
680694
cron := New()
681695
go cron.Run()

0 commit comments

Comments
 (0)