diff --git a/dkron/agent.go b/dkron/agent.go index a8f6ec7e9..5cb1ef33a 100644 --- a/dkron/agent.go +++ b/dkron/agent.go @@ -641,10 +641,6 @@ func (a *Agent) eventLoop() { if e.EventType() == serf.EventQuery { query := e.(*serf.Query) - if query.Name == QuerySchedulerRestart && a.config.Server { - a.schedule() - } - if query.Name == QueryRunJob { log.WithFields(logrus.Fields{ "query": query.Name, @@ -727,16 +723,6 @@ func (a *Agent) eventLoop() { } } -// schedule Start or restart scheduler -func (a *Agent) schedule() { - log.Info("agent: Restarting scheduler") - jobs, err := a.Store.GetJobs(nil) - if err != nil { - log.Fatal(err) - } - a.sched.Restart(jobs) -} - // Join asks the Serf instance to join. See the Serf.Join function. func (a *Agent) join(addrs []string, replay bool) (n int, err error) { log.Infof("agent: joining: %v replay: %v", addrs, replay) diff --git a/dkron/grpc.go b/dkron/grpc.go index 0ff2fd58d..c37518c71 100644 --- a/dkron/grpc.go +++ b/dkron/grpc.go @@ -81,8 +81,12 @@ func (grpcs *GRPCServer) SetJob(ctx context.Context, setJobReq *proto.SetJobRequ return nil, err } - // If everything is ok, restart the scheduler - grpcs.agent.schedule() + // If everything is ok, add the job to the scheduler + job := NewJobFromProto(setJobReq.Job) + job.Agent = grpcs.agent + if err := grpcs.agent.sched.AddJob(job); err != nil { + return nil, err + } return &proto.SetJobResponse{}, nil } @@ -110,7 +114,7 @@ func (grpcs *GRPCServer) DeleteJob(ctx context.Context, delJobReq *proto.DeleteJ jpb := job.ToProto() // If everything is ok, restart the scheduler - grpcs.agent.schedule() + grpcs.agent.sched.RemoveJob(job) return &proto.DeleteJobResponse{Job: jpb}, nil } diff --git a/dkron/leader.go b/dkron/leader.go index 89f6f1a14..14744167f 100644 --- a/dkron/leader.go +++ b/dkron/leader.go @@ -186,7 +186,13 @@ func (a *Agent) reconcileMember(member serf.Member) error { // state is up-to-date. func (a *Agent) establishLeadership(stopCh chan struct{}) error { defer metrics.MeasureSince([]string{"dkron", "leader", "establish_leadership"}, time.Now()) - a.schedule() + + log.Info("agent: Starting scheduler") + jobs, err := a.Store.GetJobs(nil) + if err != nil { + log.Fatal(err) + } + a.sched.Start(jobs) return nil } diff --git a/dkron/queries.go b/dkron/queries.go index 6755ab529..e1ff20e7e 100644 --- a/dkron/queries.go +++ b/dkron/queries.go @@ -10,8 +10,6 @@ import ( ) const ( - // QuerySchedulerRestart define the string to be sent - QuerySchedulerRestart = "scheduler:restart" // QueryRunJob define a run job query string QueryRunJob = "run:job" // QueryExecutionDone define the execution done query string diff --git a/dkron/scheduler.go b/dkron/scheduler.go index e78987c58..8e8ea4e0b 100644 --- a/dkron/scheduler.go +++ b/dkron/scheduler.go @@ -22,49 +22,35 @@ var ( // Scheduler represents a dkron scheduler instance, it stores the cron engine // and the related parameters. type Scheduler struct { - Cron *cron.Cron - Started bool + Cron *cron.Cron + Started bool + EntryJobMap map[string]cron.EntryID } // NewScheduler creates a new Scheduler instance func NewScheduler() *Scheduler { schedulerStarted.Set(0) - return &Scheduler{Cron: nil, Started: false} + return &Scheduler{ + Cron: nil, + Started: false, + EntryJobMap: make(map[string]cron.EntryID), + } } // Start the cron scheduler, adding its corresponding jobs and // executing them on time. -func (s *Scheduler) Start(jobs []*Job) { +func (s *Scheduler) Start(jobs []*Job) error { s.Cron = cron.New(cron.WithParser(extcron.NewParser())) + metrics.IncrCounter([]string{"scheduler", "start"}, 1) for _, job := range jobs { - if job.Disabled || job.ParentJob != "" { - continue - } - - log.WithFields(logrus.Fields{ - "job": job.Name, - }).Debug("scheduler: Adding job to cron") - - cronInspect.Set(job.Name, job) - metrics.EmitKey([]string{"scheduler", "job", "add", job.Name}, 1) - - // If Timezone is set on the job, and not explicitly in its schedule, - // AND its not a descriptor (that don't support timezones), add the - // timezone to the schedule so robfig/cron knows about it. - schedule := job.Schedule - if job.Timezone != "" && - !strings.HasPrefix(schedule, "@") && - !strings.HasPrefix(schedule, "TZ=") && - !strings.HasPrefix(schedule, "CRON_TZ=") { - schedule = "CRON_TZ=" + job.Timezone + " " + schedule - } - s.Cron.AddJob(schedule, job) + s.AddJob(job) } s.Cron.Start() s.Started = true - schedulerStarted.Set(1) + + return nil } // Stop stops the scheduler effectively not running any job. @@ -100,3 +86,44 @@ func (s *Scheduler) GetEntry(job *Job) (cron.Entry, bool) { } return cron.Entry{}, false } + +// AddJob Adds a job to the cron scheduler +func (s *Scheduler) AddJob(job *Job) error { + if job.Disabled || job.ParentJob != "" { + return nil + } + + log.WithFields(logrus.Fields{ + "job": job.Name, + }).Debug("scheduler: Adding job to cron") + + cronInspect.Set(job.Name, job) + metrics.EmitKey([]string{"scheduler", "job", "add", job.Name}, 1) + + // If Timezone is set on the job, and not explicitly in its schedule, + // AND its not a descriptor (that don't support timezones), add the + // timezone to the schedule so robfig/cron knows about it. + schedule := job.Schedule + if job.Timezone != "" && + !strings.HasPrefix(schedule, "@") && + !strings.HasPrefix(schedule, "TZ=") && + !strings.HasPrefix(schedule, "CRON_TZ=") { + schedule = "CRON_TZ=" + job.Timezone + " " + schedule + } + id, err := s.Cron.AddJob(schedule, job) + if err != nil { + return err + } + s.EntryJobMap[job.Name] = id + + return nil +} + +// RemoveJob removes a job from the cron scheduler +func (s *Scheduler) RemoveJob(job *Job) { + log.WithFields(logrus.Fields{ + "job": job.Name, + }).Debug("scheduler: Removing job from cron") + s.Cron.Remove(s.EntryJobMap[job.Name]) + delete(s.EntryJobMap, job.Name) +}