Skip to content

Commit

Permalink
Merge pull request #628 from distribworks/fix_605
Browse files Browse the repository at this point in the history
fix: only start and stop scheduler on leadership
  • Loading branch information
Victor Castell authored Nov 16, 2019
2 parents fd581e2 + 99c969b commit 82e0bfa
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 47 deletions.
14 changes: 0 additions & 14 deletions dkron/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,6 @@ func (a *Agent) eventLoop() {
if e.EventType() == serf.EventQuery {
query := e.(*serf.Query)

if query.Name == QuerySchedulerRestart && a.config.Server {
a.schedule()
}

if query.Name == QueryRunJob {
log.WithFields(logrus.Fields{
"query": query.Name,
Expand Down Expand Up @@ -727,16 +723,6 @@ func (a *Agent) eventLoop() {
}
}

// schedule Start or restart scheduler
func (a *Agent) schedule() {
log.Info("agent: Restarting scheduler")
jobs, err := a.Store.GetJobs(nil)
if err != nil {
log.Fatal(err)
}
a.sched.Restart(jobs)
}

// Join asks the Serf instance to join. See the Serf.Join function.
func (a *Agent) join(addrs []string, replay bool) (n int, err error) {
log.Infof("agent: joining: %v replay: %v", addrs, replay)
Expand Down
10 changes: 7 additions & 3 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ
return nil, err
}

// If everything is ok, restart the scheduler
grpcs.agent.schedule()
// If everything is ok, add the job to the scheduler
job := NewJobFromProto(setJobReq.Job)
job.Agent = grpcs.agent
if err := grpcs.agent.sched.AddJob(job); err != nil {
return nil, err
}

return &proto.SetJobResponse{}, nil
}
Expand Down Expand Up @@ -110,7 +114,7 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ
jpb := job.ToProto()

// If everything is ok, restart the scheduler
grpcs.agent.schedule()
grpcs.agent.sched.RemoveJob(job)

return &proto.DeleteJobResponse{Job: jpb}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion dkron/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ func (a *Agent) reconcileMember(member serf.Member) error {
// state is up-to-date.
func (a *Agent) establishLeadership(stopCh chan struct{}) error {
defer metrics.MeasureSince([]string{"dkron", "leader", "establish_leadership"}, time.Now())
a.schedule()

log.Info("agent: Starting scheduler")
jobs, err := a.Store.GetJobs(nil)
if err != nil {
log.Fatal(err)
}
a.sched.Start(jobs)

return nil
}
Expand Down
2 changes: 0 additions & 2 deletions dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
)

const (
// QuerySchedulerRestart define the string to be sent
QuerySchedulerRestart = "scheduler:restart"
// QueryRunJob define a run job query string
QueryRunJob = "run:job"
// QueryExecutionDone define the execution done query string
Expand Down
81 changes: 54 additions & 27 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,49 +22,35 @@ var (
// Scheduler represents a dkron scheduler instance, it stores the cron engine
// and the related parameters.
type Scheduler struct {
Cron *cron.Cron
Started bool
Cron *cron.Cron
Started bool
EntryJobMap map[string]cron.EntryID
}

// NewScheduler creates a new Scheduler instance
func NewScheduler() *Scheduler {
schedulerStarted.Set(0)
return &Scheduler{Cron: nil, Started: false}
return &Scheduler{
Cron: nil,
Started: false,
EntryJobMap: make(map[string]cron.EntryID),
}
}

// Start the cron scheduler, adding its corresponding jobs and
// executing them on time.
func (s *Scheduler) Start(jobs []*Job) {
func (s *Scheduler) Start(jobs []*Job) error {
s.Cron = cron.New(cron.WithParser(extcron.NewParser()))

metrics.IncrCounter([]string{"scheduler", "start"}, 1)
for _, job := range jobs {
if job.Disabled || job.ParentJob != "" {
continue
}

log.WithFields(logrus.Fields{
"job": job.Name,
}).Debug("scheduler: Adding job to cron")

cronInspect.Set(job.Name, job)
metrics.EmitKey([]string{"scheduler", "job", "add", job.Name}, 1)

// If Timezone is set on the job, and not explicitly in its schedule,
// AND its not a descriptor (that don't support timezones), add the
// timezone to the schedule so robfig/cron knows about it.
schedule := job.Schedule
if job.Timezone != "" &&
!strings.HasPrefix(schedule, "@") &&
!strings.HasPrefix(schedule, "TZ=") &&
!strings.HasPrefix(schedule, "CRON_TZ=") {
schedule = "CRON_TZ=" + job.Timezone + " " + schedule
}
s.Cron.AddJob(schedule, job)
s.AddJob(job)
}
s.Cron.Start()
s.Started = true

schedulerStarted.Set(1)

return nil
}

// Stop stops the scheduler effectively not running any job.
Expand Down Expand Up @@ -100,3 +86,44 @@ func (s *Scheduler) GetEntry(job *Job) (cron.Entry, bool) {
}
return cron.Entry{}, false
}

// AddJob Adds a job to the cron scheduler
func (s *Scheduler) AddJob(job *Job) error {
if job.Disabled || job.ParentJob != "" {
return nil
}

log.WithFields(logrus.Fields{
"job": job.Name,
}).Debug("scheduler: Adding job to cron")

cronInspect.Set(job.Name, job)
metrics.EmitKey([]string{"scheduler", "job", "add", job.Name}, 1)

// If Timezone is set on the job, and not explicitly in its schedule,
// AND its not a descriptor (that don't support timezones), add the
// timezone to the schedule so robfig/cron knows about it.
schedule := job.Schedule
if job.Timezone != "" &&
!strings.HasPrefix(schedule, "@") &&
!strings.HasPrefix(schedule, "TZ=") &&
!strings.HasPrefix(schedule, "CRON_TZ=") {
schedule = "CRON_TZ=" + job.Timezone + " " + schedule
}
id, err := s.Cron.AddJob(schedule, job)
if err != nil {
return err
}
s.EntryJobMap[job.Name] = id

return nil
}

// RemoveJob removes a job from the cron scheduler
func (s *Scheduler) RemoveJob(job *Job) {
log.WithFields(logrus.Fields{
"job": job.Name,
}).Debug("scheduler: Removing job from cron")
s.Cron.Remove(s.EntryJobMap[job.Name])
delete(s.EntryJobMap, job.Name)
}

0 comments on commit 82e0bfa

Please sign in to comment.