Skip to content

Commit

Permalink
Merge pull request #630 from distribworks/fix_compute_status
Browse files Browse the repository at this point in the history
Save job status on ExecutionDone
  • Loading branch information
Victor Castell authored Nov 16, 2019
2 parents 82e0bfa + 19fa048 commit 7449117
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 91 deletions.
36 changes: 18 additions & 18 deletions builtin/bins/dkron-executor-http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,24 @@ func TestNoVerifyPeer(t *testing.T) {
assert.Equal(t, output.Error, "")
}

func TestClientSSLCert(t *testing.T) {
pa := &dkron.ExecuteRequest{
JobName: "testJob",
Config: map[string]string{
"method": "GET",
"url": "https://client.badssl.com/",
"expectCode": "200",
"debug": "true",
"tlsCertificateFile": "testdata/badssl.com-client.pem",
"tlsCertificateKeyFile": "testdata/badssl.com-client-key-decrypted.pem",
},
}
http := &HTTP{}
output, _ := http.Execute(pa)
fmt.Println(string(output.Output))
fmt.Println(output.Error)
assert.Equal(t, output.Error, "")
}
// func TestClientSSLCert(t *testing.T) {
// pa := &dkron.ExecuteRequest{
// JobName: "testJob",
// Config: map[string]string{
// "method": "GET",
// "url": "https://client.badssl.com/",
// "expectCode": "200",
// "debug": "true",
// "tlsCertificateFile": "testdata/badssl.com-client.pem",
// "tlsCertificateKeyFile": "testdata/badssl.com-client-key-decrypted.pem",
// },
// }
// http := &HTTP{}
// output, _ := http.Execute(pa)
// fmt.Println(string(output.Output))
// fmt.Println(output.Error)
// assert.Equal(t, output.Error, "")
// }

func TestRootCA(t *testing.T) {
pa := &dkron.ExecuteRequest{
Expand Down
4 changes: 2 additions & 2 deletions dkron/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (h *HTTPTransport) indexHandler(c *gin.Context) {
func (h *HTTPTransport) jobsHandler(c *gin.Context) {
metadata := c.QueryMap("metadata")

jobs, err := h.agent.Store.GetJobs(&JobOptions{ComputeStatus: true, Metadata: metadata})
jobs, err := h.agent.Store.GetJobs(&JobOptions{Metadata: metadata})
if err != nil {
log.WithError(err).Error("api: Unable to get jobs, store not reachable.")
return
Expand All @@ -128,7 +128,7 @@ func (h *HTTPTransport) jobsHandler(c *gin.Context) {
func (h *HTTPTransport) jobGetHandler(c *gin.Context) {
jobName := c.Param("job")

job, err := h.agent.Store.GetJob(jobName, &JobOptions{ComputeStatus: true})
job, err := h.agent.Store.GetJob(jobName, nil)
if err != nil {
log.Error(err)
}
Expand Down
8 changes: 4 additions & 4 deletions dkron/assets/assets_vfsdata.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dkron/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (grpcs *GRPCServer) ExecutionDone(ctx context.Context, execDoneReq *proto.E

// Jobs that have dependent jobs are a bit more expensive because we need to call the Status() method for every execution.
// Check first if there's dependent jobs and then check for the job status to begin execution dependent jobs on success.
if len(job.DependentJobs) > 0 && job.GetStatus() == StatusSuccess {
if len(job.DependentJobs) > 0 && job.Status == StatusSuccess {
for _, djn := range job.DependentJobs {
dj, err := grpcs.agent.Store.GetJob(djn, nil)
if err != nil {
Expand Down
37 changes: 0 additions & 37 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,42 +249,6 @@ func (j *Job) String() string {
return fmt.Sprintf("\"Job: %s, scheduled at: %s, tags:%v\"", j.Name, j.Schedule, j.Tags)
}

// GetStatus returns the status of a job whether it's running, succeeded or failed
func (j *Job) GetStatus() string {
// Maybe we are testing
if j.Agent == nil {
return StatusNotSet
}

execs, _ := j.Agent.Store.GetLastExecutionGroup(j.Name)
success := 0
failed := 0
for _, ex := range execs {
if ex.FinishedAt.IsZero() {
return StatusRunning
}
}

var status string
for _, ex := range execs {
if ex.Success {
success = success + 1
} else {
failed = failed + 1
}
}

if failed == 0 {
status = StatusSuccess
} else if failed > 0 && success == 0 {
status = StatusFailed
} else if failed > 0 && success > 0 {
status = StatusPartialyFailed
}

return status
}

// GetParent returns the parent job of a job
func (j *Job) GetParent() (*Job, error) {
// Maybe we are testing
Expand Down Expand Up @@ -335,7 +299,6 @@ func (j *Job) isRunnable() bool {
if j.Concurrency == ConcurrencyForbid {
j.Agent.RefreshJobStatus(j.Name)
}
j.Status = j.GetStatus()

if j.Status == StatusRunning && j.Concurrency == ConcurrencyForbid {
log.WithFields(logrus.Fields{
Expand Down
1 change: 1 addition & 0 deletions dkron/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (a *Agent) RunQuery(job *Job, ex *Execution) {

if e, ok := a.sched.GetEntry(job); ok {
job.Next = e.Next
job.Status = StatusRunning
}
if err := a.applySetJob(job.ToProto()); err != nil {
log.WithError(err).WithFields(logrus.Fields{
Expand Down
100 changes: 76 additions & 24 deletions dkron/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ type Store struct {

// JobOptions additional options to apply when loading a Job.
type JobOptions struct {
ComputeStatus bool
Metadata map[string]string `json:"tags"`
Metadata map[string]string `json:"tags"`
}

// NewStore creates a new Storage instance.
Expand Down Expand Up @@ -268,6 +267,12 @@ func (s *Store) SetExecutionDone(execution *Execution) (bool, error) {
pbj.ErrorCount++
}

status, err := s.computeStatus(pbj, pbe, txn)
if err != nil {
return err
}
pbj.Status = status

if err := s.setJobTxnFunc(&pbj)(txn); err != nil {
return err
}
Expand Down Expand Up @@ -322,9 +327,6 @@ func (s *Store) GetJobs(options *JobOptions) ([]*Job, error) {
if options.Metadata != nil && len(options.Metadata) > 0 && !s.jobHasMetadata(job, options.Metadata) {
continue
}
if options.ComputeStatus {
job.Status = job.GetStatus()
}
}

jobs = append(jobs, job)
Expand All @@ -346,9 +348,6 @@ func (s *Store) GetJob(name string, options *JobOptions) (*Job, error) {

job := NewJobFromProto(&pbj)
job.Agent = s.agent
if options != nil && options.ComputeStatus {
job.Status = job.GetStatus()
}

return job, nil
}
Expand Down Expand Up @@ -426,26 +425,35 @@ func (s *Store) GetExecutions(jobName string) ([]*Execution, error) {
return nil, err
}

return s.unmarshalExecutions(kvs, jobName)
return s.unmarshalExecutions(kvs)
}

type kv struct {
Key string
Value []byte
}

func (s *Store) list(prefix string, checkRoot bool) ([]*kv, error) {
kvs := []*kv{}
found := false
func (s *Store) list(prefix string, checkRoot bool) ([]kv, error) {
var found bool
kvs := []kv{}

err := s.db.View(s.listTxnFunc(prefix, &kvs, &found))
if err == nil && !found && checkRoot {
return nil, badger.ErrKeyNotFound
}

return kvs, err
}

err := s.db.View(func(tx *badger.Txn) error {
it := tx.NewIterator(badger.DefaultIteratorOptions)
func (*Store) listTxnFunc(prefix string, kvs *[]kv, found *bool) func(txn *badger.Txn) error {
return func(txn *badger.Txn) error {
it := txn.NewIterator(badger.DefaultIteratorOptions)
defer it.Close()

prefix := []byte(prefix)

for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
found = true
*found = true
item := it.Item()
k := item.Key()

Expand All @@ -459,18 +467,12 @@ func (s *Store) list(prefix string, checkRoot bool) ([]*kv, error) {
return err
}

kv := &kv{Key: string(k), Value: body}
kvs = append(kvs, kv)
kv := kv{Key: string(k), Value: body}
*kvs = append(*kvs, kv)
}

return nil
})

if err == nil && !found && checkRoot {
return nil, badger.ErrKeyNotFound
}

return kvs, err
}

// GetLastExecutionGroup get last execution group given the Job name.
Expand Down Expand Up @@ -636,7 +638,7 @@ func (s *Store) Restore(r io.ReadCloser) error {
return s.db.Load(r, 256)
}

func (s *Store) unmarshalExecutions(items []*kv, stopWord string) ([]*Execution, error) {
func (s *Store) unmarshalExecutions(items []kv) ([]*Execution, error) {
var executions []*Execution
for _, item := range items {
var pbe dkronpb.Execution
Expand All @@ -651,6 +653,56 @@ func (s *Store) unmarshalExecutions(items []*kv, stopWord string) ([]*Execution,
return executions, nil
}

func (s *Store) computeStatus(job dkronpb.Job, pbe *dkronpb.Execution, txn *badger.Txn) (string, error) {
// compute job status based on execution group
kvs := []kv{}
found := false
prefix := fmt.Sprintf("executions/%s/", job.Name)

if err := s.listTxnFunc(prefix, &kvs, &found)(txn); err != nil {
return "", err
}

execs, err := s.unmarshalExecutions(kvs)
if err != nil {
return "", err
}

var executions []*Execution
for _, ex := range execs {
if ex.Group == pbe.Group {
executions = append(executions, ex)
}
}

success := 0
failed := 0
for _, ex := range execs {
if ex.FinishedAt.IsZero() {
return StatusRunning, nil
}
}

var status string
for _, ex := range execs {
if ex.Success {
success = success + 1
} else {
failed = failed + 1
}
}

if failed == 0 {
status = StatusSuccess
} else if failed > 0 && success == 0 {
status = StatusFailed
} else if failed > 0 && success > 0 {
status = StatusPartialyFailed
}

return status, nil
}

func trimDirectoryKey(key []byte) []byte {
if isDirectoryKey(key) {
return key[:len(key)-1]
Expand Down
10 changes: 5 additions & 5 deletions dkron/templates/templates_vfsdata.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions static/js/dashboard.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ dkron.filter('statusClass', function () {
return 'status-warning glyphicon-exclamation-sign'
case 'running':
return 'status-running glyphicon-play-circle'
default:
return 'glyphicon-question-sign'
}
return input;
};
Expand Down

0 comments on commit 7449117

Please sign in to comment.