Skip to content

Commit 97e1415

Browse files
committed
feat: add lock update mechanism for jobs, closes #762
- Add `jobOutUpdateLockRequest` channel in the executor - Implement lock update requests in the job execution process - Add `Lock()` method to the `Job` interface - Update the scheduler to handle lock update requests - Add a test case to verify the new locking mechanism
1 parent 3b2dcd8 commit 97e1415

File tree

4 files changed

+88
-7
lines changed

4 files changed

+88
-7
lines changed

executor.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type executor struct {
3030
jobsOutCompleted chan uuid.UUID
3131
// used to request jobs from the scheduler
3232
jobOutRequest chan jobOutRequest
33+
// used to request jobs from the scheduler
34+
jobOutUpdateLockRequest chan jobOutUpdateLockRequest
3335

3436
// used by the executor to receive a stop signal from the scheduler
3537
stopCh chan struct{}
@@ -376,7 +378,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
376378
e.incrementJobCounter(j, Skip)
377379
return
378380
}
379-
defer func() { _ = lock.Unlock(j.ctx) }()
381+
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
382+
id: j.id,
383+
lock: lock,
384+
}
385+
386+
defer func() {
387+
_ = lock.Unlock(j.ctx)
388+
}()
380389
} else if e.locker != nil {
381390
lock, err := e.locker.Lock(j.ctx, j.name)
382391
if err != nil {
@@ -385,7 +394,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
385394
e.incrementJobCounter(j, Skip)
386395
return
387396
}
388-
defer func() { _ = lock.Unlock(j.ctx) }()
397+
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
398+
id: j.id,
399+
lock: lock,
400+
}
401+
402+
defer func() {
403+
_ = lock.Unlock(j.ctx)
404+
}()
389405
}
390406
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
391407

job.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type internalJob struct {
3030
nextScheduled []time.Time
3131

3232
lastRun time.Time
33+
lastLock Lock
3334
function any
3435
parameters []any
3536
timer clockwork.Timer
@@ -993,6 +994,7 @@ type Job interface {
993994
RunNow() error
994995
// Tags returns the job's string tags.
995996
Tags() []string
997+
Lock() Lock
996998
}
997999

9981000
var _ Job = (*job)(nil)
@@ -1091,3 +1093,9 @@ func (j job) RunNow() error {
10911093
}
10921094
return err
10931095
}
1096+
1097+
func (j job) Lock() Lock {
1098+
ij := requestJob(j.id, j.jobOutRequest)
1099+
1100+
return ij.lastLock
1101+
}

scheduler.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ type jobOutRequest struct {
107107
outChan chan internalJob
108108
}
109109

110+
type jobOutUpdateLockRequest struct {
111+
id uuid.UUID
112+
lock Lock
113+
}
114+
110115
type runJobRequest struct {
111116
id uuid.UUID
112117
outChan chan error
@@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
131136
logger: &noOpLogger{},
132137
clock: clockwork.NewRealClock(),
133138

134-
jobsIn: make(chan jobIn),
135-
jobsOutForRescheduling: make(chan uuid.UUID),
136-
jobsOutCompleted: make(chan uuid.UUID),
137-
jobOutRequest: make(chan jobOutRequest, 1000),
138-
done: make(chan error),
139+
jobsIn: make(chan jobIn),
140+
jobsOutForRescheduling: make(chan uuid.UUID),
141+
jobsOutCompleted: make(chan uuid.UUID),
142+
jobOutRequest: make(chan jobOutRequest, 1000),
143+
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
144+
done: make(chan error),
139145
}
140146

141147
s := &scheduler{
@@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
190196
case out := <-s.jobOutRequestCh:
191197
s.selectJobOutRequest(out)
192198

199+
case out := <-s.exec.jobOutUpdateLockRequest:
200+
s.jobOutUpdateLockRequest(out)
201+
193202
case out := <-s.allJobsOutRequest:
194203
s.selectAllJobsOutRequest(out)
195204

@@ -425,6 +434,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
425434
close(out.outChan)
426435
}
427436

437+
func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
438+
if j, ok := s.jobs[out.id]; ok {
439+
j.lastLock = out.lock
440+
s.jobs[out.id] = j
441+
}
442+
}
443+
428444
func (s *scheduler) selectNewJob(in newJobIn) {
429445
j := in.job
430446
if s.started {

scheduler_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -2536,3 +2536,44 @@ func TestScheduler_WithMonitor(t *testing.T) {
25362536
})
25372537
}
25382538
}
2539+
2540+
func TestJob_Lock(t *testing.T) {
2541+
locker := &testLocker{
2542+
notLocked: make(chan struct{}, 1),
2543+
}
2544+
2545+
s := newTestScheduler(t,
2546+
WithDistributedLocker(locker),
2547+
)
2548+
2549+
jobRan := make(chan struct{})
2550+
j, err := s.NewJob(
2551+
DurationJob(time.Millisecond*100),
2552+
NewTask(func() {
2553+
time.Sleep(50 * time.Millisecond)
2554+
jobRan <- struct{}{}
2555+
}),
2556+
)
2557+
require.NoError(t, err)
2558+
2559+
s.Start()
2560+
defer s.Shutdown()
2561+
2562+
select {
2563+
case <-jobRan:
2564+
// Job has run
2565+
case <-time.After(200 * time.Millisecond):
2566+
t.Fatal("Job did not run in time")
2567+
}
2568+
2569+
require.Eventually(t, func() bool {
2570+
if locker.jobLocked {
2571+
return true
2572+
}
2573+
2574+
return false
2575+
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")
2576+
2577+
lock := j.Lock()
2578+
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
2579+
}

0 commit comments

Comments
 (0)