Skip to content

Commit

Permalink
Merge pull request #642 from distribworks/fix_633
Browse files Browse the repository at this point in the history
fix: Fix for wrong job status on RunQuery
  • Loading branch information
Victor Castell authored Nov 20, 2019
2 parents 2131fb7 + 7569a7a commit 0ee6623
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 262 deletions.
8 changes: 4 additions & 4 deletions dkron/assets/assets_vfsdata.go

Large diffs are not rendered by default.

17 changes: 7 additions & 10 deletions dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,11 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E
"execution": execution,
}).Debug("grpc: Retrying execution")

grpcs.agent.RunQuery(job, &execution)
return nil, nil
_ = grpcs.agent.RunQuery(job.Name, &execution)
return &proto.ExecutionDoneResponse{
From: grpcs.agent.config.NodeName,
Payload: []byte("retry"),
}, nil
}

exg, err := grpcs.agent.Store.GetExecutionGroup(&execution)
Expand Down Expand Up @@ -241,14 +244,8 @@ func (grpcs *GRPCServer) Leave(ctx context.Context, in *empty.Empty) (*empty.Emp

// RunJob runs a job in the cluster
func (grpcs *GRPCServer) RunJob(ctx context.Context, req *proto.RunJobRequest) (*proto.RunJobResponse, error) {
job, err := grpcs.agent.Store.GetJob(req.JobName, nil)
if err != nil {
return nil, err
}

ex := NewExecution(job.Name)
grpcs.agent.RunQuery(job, ex)

ex := NewExecution(req.JobName)
job := grpcs.agent.RunQuery(req.JobName, ex)
jpb := job.ToProto()

return &proto.RunJobResponse{Job: jpb}, nil
Expand Down
11 changes: 1 addition & 10 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,16 +230,7 @@ func (j *Job) Run() {

// Simple execution wrapper
ex := NewExecution(j.Name)

// This should be only called on the actual scheduler execution
n, err := j.GetNext()
if err != nil {
log.WithError(err).WithField("job", j.Name).
Fatal("agent: Error computing next execution")
}
j.Next = n

j.Agent.RunQuery(j, ex)
_ = j.Agent.RunQuery(j.Name, ex)
}
}
}
Expand Down
74 changes: 74 additions & 0 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,77 @@ func TestToProto(t *testing.T) {
jpb := j.ToProto()
assert.Equal(t, jpb.Processors, proc)
}

func Test_isRunnable(t *testing.T) {
dir, err := ioutil.TempDir("", "dkron-test")
require.NoError(t, err)
defer os.RemoveAll(dir)

s, err := NewStore(nil, dir)
defer s.Shutdown()
require.NoError(t, err)

testCases := []struct {
name string
job *Job
want bool
}{
{
name: "global lock",
job: &Job{
Agent: &Agent{
GlobalLock: true,
Store: s,
},
},
want: false,
},
{
name: "running forbid",
job: &Job{
Agent: &Agent{
Store: s,
},
Status: StatusRunning,
Concurrency: ConcurrencyForbid,
},
want: false,
},
{
name: "success forbid",
job: &Job{
Agent: &Agent{
Store: s,
},
Status: StatusRunning,
Concurrency: ConcurrencyForbid,
},
want: false,
},
{
name: "running true",
job: &Job{
Agent: &Agent{
Store: s,
},
running: true,
},
want: false,
},
{
name: "should run",
job: &Job{
Agent: &Agent{
Store: s,
},
},
want: true,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, tt.job.isRunnable())
})
}
}
23 changes: 19 additions & 4 deletions dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ const (
QueryExecutionDone = "execution:done"
)

var rescheduleThrotle *time.Timer

// RunQueryParam defines the struct used to send a Run query
// using serf.
type RunQueryParam struct {
Expand All @@ -27,14 +25,29 @@ type RunQueryParam struct {

// RunQuery sends a serf run query to the cluster, this is used to ask a node or nodes
// to run a Job.
func (a *Agent) RunQuery(job *Job, ex *Execution) {
func (a *Agent) RunQuery(jobName string, ex *Execution) *Job {
start := time.Now()
var params *serf.QueryParam

if e, ok := a.sched.GetEntry(job); ok {
job, err := a.Store.GetJob(jobName, nil)
if err != nil {
log.WithError(err).WithFields(logrus.Fields{
"job": job.Name,
"method": "RunQuery",
}).Fatal("queries: Error retrieveing job from store")
return nil
}

if e, ok := a.sched.GetEntry(jobName); ok {
job.Next = e.Next
job.Status = StatusRunning
} else {
log.WithError(err).WithFields(logrus.Fields{
"job": job.Name,
"method": "RunQuery",
}).Fatal("queries: Error retrieveing job from scheduler")
}

if err := a.applySetJob(job.ToProto()); err != nil {
log.WithError(err).WithFields(logrus.Fields{
"job": job.Name,
Expand Down Expand Up @@ -128,6 +141,8 @@ func (a *Agent) RunQuery(job *Job, ex *Execution) {
"time": time.Since(start),
"query": QueryRunJob,
}).Debug("agent: Done receiving acks and responses")

return job
}

// Broadcast a ExecutionDone to the cluster.
Expand Down
4 changes: 2 additions & 2 deletions dkron/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ func (s *Scheduler) Restart(jobs []*Job) {

// GetEntry returns a scheduler entry from a snapshot in
// the current time, and whether or not the entry was found.
func (s *Scheduler) GetEntry(job *Job) (cron.Entry, bool) {
func (s *Scheduler) GetEntry(jobName string) (cron.Entry, bool) {
for _, e := range s.Cron.Entries() {
j, _ := e.Job.(*Job)
if j.Name == job.Name {
if j.Name == jobName {
return e, true
}
}
Expand Down
2 changes: 1 addition & 1 deletion dkron/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSchedule(t *testing.T) {
assert.True(t, sched.Started)
now := time.Now().Truncate(time.Second)

entry, _ := sched.GetEntry(testJob1)
entry, _ := sched.GetEntry(testJob1.Name)
assert.Equal(t, now.Add(time.Second*2), entry.Next)

testJob2 := &Job{
Expand Down
Loading

0 comments on commit 0ee6623

Please sign in to comment.