Skip to content

Commit

Permalink
Fixes issue with missing processor config (#601)
Browse files Browse the repository at this point in the history
* Fixes issue with missing processor config

Processor config was missing in the FromProto ToProto conversions of the job model.

This caused any change in the field to be lost.

The change forced to redefine PluginConfig as map[string]string as proto3 doesn't have an easy concept of go interface.

Also modified the corresponding processors.
  • Loading branch information
Victor Castell authored Sep 13, 2019
1 parent 4917ebb commit e821de6
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 238 deletions.
15 changes: 8 additions & 7 deletions builtin/bins/dkron-processor-files/files_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io/ioutil"
"os"
"strconv"

"github.com/distribworks/dkron/v2/dkron"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -39,17 +40,17 @@ func (l *FilesOutput) Process(args *dkron.ExecutionProcessorArgs) dkron.Executio
}

func (l *FilesOutput) parseConfig(config dkron.PluginConfig) {
forward, ok := config["forward"].(bool)
if ok {
l.forward = forward
log.Infof("Forwarding set to: %t", forward)
} else {
forward, err := strconv.ParseBool(config["forward"])
if err != nil {
l.forward = false
log.WithField("param", "forward").Warning("Incorrect format or param not found.")
} else {
l.forward = forward
log.Infof("Forwarding set to: %t", forward)
}

logDir, ok := config["log_dir"].(string)
if ok {
logDir := config["log_dir"]
if logDir != "" {
l.logDir = logDir
log.Infof("Log dir set to: %s", logDir)
} else {
Expand Down
2 changes: 1 addition & 1 deletion builtin/bins/dkron-processor-files/files_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestProcess(t *testing.T) {
Output: []byte("test"),
},
Config: dkron.PluginConfig{
"forward": false,
"forward": "false",
"log_dir": "/tmp",
},
}
Expand Down
10 changes: 6 additions & 4 deletions builtin/bins/dkron-processor-log/log_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
"strconv"

"github.com/distribworks/dkron/v2/dkron"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,11 +38,12 @@ func (l *LogOutput) Process(args *dkron.ExecutionProcessorArgs) dkron.Execution
}

func (l *LogOutput) parseConfig(config dkron.PluginConfig) {
forward, ok := config["forward"].(bool)
if ok {
forward, err := strconv.ParseBool(config["forward"])
if err != nil {
l.forward = false
log.WithField("param", "forward").Warning("Incorrect format or param not found.")
} else {
l.forward = forward
log.Infof("Forwarding set to: %t", forward)
} else {
log.Error("Incorrect format in forward param")
}
}
2 changes: 1 addition & 1 deletion builtin/bins/dkron-processor-log/log_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestProcess(t *testing.T) {
Output: []byte("test"),
},
Config: dkron.PluginConfig{
"forward": false,
"forward": "false",
},
}

Expand Down
11 changes: 7 additions & 4 deletions builtin/bins/dkron-processor-syslog/syslog_output.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"strconv"

"github.com/distribworks/dkron/v2/dkron"
"github.com/hashicorp/go-syslog"
log "github.com/sirupsen/logrus"
Expand All @@ -27,11 +29,12 @@ func (l *SyslogOutput) Process(args *dkron.ExecutionProcessorArgs) dkron.Executi
}

func (l *SyslogOutput) parseConfig(config dkron.PluginConfig) {
forward, ok := config["forward"].(bool)
if ok {
forward, err := strconv.ParseBool(config["forward"])
if err != nil {
l.forward = false
log.WithField("param", "forward").Warning("Incorrect format or param not found.")
} else {
l.forward = forward
log.Infof("Forwarding set to: %t", forward)
} else {
log.Error("Incorrect format in forward param")
}
}
2 changes: 1 addition & 1 deletion builtin/bins/dkron-processor-syslog/syslog_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestProcess(t *testing.T) {
Output: []byte("test"),
},
Config: dkron.PluginConfig{
"forward": true,
"forward": "true",
},
}

Expand Down
2 changes: 1 addition & 1 deletion dkron/execution_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ type ExecutionProcessorArgs struct {
}

// PluginConfig holds a map of the plugin configuration data structure.
type PluginConfig map[string]interface{}
type PluginConfig map[string]string
14 changes: 14 additions & 0 deletions dkron/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type Job struct {
// NewJobFromProto create a new Job from a PB Job struct
func NewJobFromProto(in *proto.Job) *Job {
next, _ := ptypes.Timestamp(in.GetNext())

job := &Job{
Name: in.Name,
DisplayName: in.Displayname,
Expand Down Expand Up @@ -154,6 +155,13 @@ func NewJobFromProto(in *proto.Job) *Job {
t, _ := ptypes.Timestamp(in.GetLastError().GetTime())
job.LastError.Set(t)
}

procs := make(map[string]PluginConfig)
for k, v := range in.Processors {
procs[k] = v.Config
}
job.Processors = procs

return job
}

Expand All @@ -172,6 +180,11 @@ func (j *Job) ToProto() *proto.Job {
lastError.Time, _ = ptypes.TimestampProto(j.LastError.Get())
}
next, _ := ptypes.TimestampProto(j.Next)

processors := make(map[string]*proto.PluginConfig)
for k, v := range j.Processors {
processors[k] = &proto.PluginConfig{Config: v}
}
return &proto.Job{
Name: j.Name,
Displayname: j.DisplayName,
Expand All @@ -187,6 +200,7 @@ func (j *Job) ToProto() *proto.Job {
DependentJobs: j.DependentJobs,
ParentJob: j.ParentJob,
Concurrency: j.Concurrency,
Processors: processors,
Executor: j.Executor,
ExecutorConfig: j.ExecutorConfig,
Status: j.Status,
Expand Down
38 changes: 38 additions & 0 deletions dkron/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"testing"

"github.com/distribworks/dkron/v2/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -66,3 +67,40 @@ func TestJobGetParent(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, ptj.DependentJobs)
}

func TestNewJobFromProto(t *testing.T) {
testConfig := map[string]PluginConfig{
"test_processor": {
"config_key": "config_value",
},
}

in := &proto.Job{}
proc := map[string]*proto.PluginConfig{
"test_processor": {
Config: map[string]string{"config_key": "config_value"},
},
}
in.Processors = proc

j := NewJobFromProto(in)
assert.Equal(t, testConfig, j.Processors)
}

func TestToProto(t *testing.T) {
j := &Job{
Processors: map[string]PluginConfig{
"test_processor": {
"config_key": "config_value",
},
},
}
proc := map[string]*proto.PluginConfig{
"test_processor": {
Config: map[string]string{"config_key": "config_value"},
},
}

jpb := j.ToProto()
assert.Equal(t, jpb.Processors, proc)
}
Loading

0 comments on commit e821de6

Please sign in to comment.