Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/Azure/go-autorest/autorest v0.10.0 // indirect
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/coreos/etcd v3.3.15+incompatible // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/fatih/color v1.9.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -23,11 +22,10 @@ require (
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.9
github.com/lyft/flyteplugins v0.5.12
github.com/lyft/flyteplugins v0.5.14-0.20201019203656-dbcbf808fc35
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/ncw/swift v1.0.50 // indirect
github.com/pkg/errors v0.9.1
Expand All @@ -49,7 +47,6 @@ require (
k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab // indirect
sigs.k8s.io/controller-runtime v0.5.1
sigs.k8s.io/testing_frameworks v0.1.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

Expand Down
43 changes: 2 additions & 41 deletions go.sum

Large diffs are not rendered by default.

34 changes: 24 additions & 10 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/lyft/flytestdlib/config"
"k8s.io/apimachinery/pkg/util/sets"
)

//go:generate pflags Config --default-var defaultConfig
Expand All @@ -14,7 +13,7 @@ const SectionKey = "tasks"

var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}},
TaskPlugins: TaskPluginConfig{EnabledPlugins: map[string]EnabledPlugins{}},
MaxPluginPhaseVersions: 100000,
BarrierConfig: BarrierConfig{
Enabled: true,
Expand Down Expand Up @@ -45,23 +44,38 @@ type BarrierConfig struct {
CacheTTL config.Duration `json:"cache-ttl" pflag:", Max duration that a barrier would be respected if the process is not restarted. This should account for time required to store the record into persistent storage (across multiple rounds."`
}

type EnabledPlugins struct {
DefaultPluginTasks []string `json:"default-plugins-tasks" pflag:",Tasks for which this plugin is the default implementation"`
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
EnabledPlugins map[string]EnabledPlugins `json:"enabled-plugins" pflag:",Plugins enabled currently"`
}

type BackOffConfig struct {
BaseSecond int `json:"base-second" pflag:",The number of seconds representing the base duration of the exponential backoff"`
MaxDuration config.Duration `json:"max-duration" pflag:",The cap of the backoff duration"`
}

func (p TaskPluginConfig) GetEnabledPluginsSet() sets.String {
s := sets.NewString()
for _, e := range p.EnabledPlugins {
cleanedPluginName := strings.Trim(e, " ")
cleanedPluginName = strings.ToLower(cleanedPluginName)
s.Insert(cleanedPluginName)
func cleanString(source string) string {
cleaned := strings.Trim(source, " ")
cleaned = strings.ToLower(cleaned)
return cleaned
}

func (p TaskPluginConfig) GetEnabledPlugins() map[string]EnabledPlugins {
enabledPlugins := make(map[string]EnabledPlugins)
for pluginName, info := range p.EnabledPlugins {
cleanedDefaultTasks := make([]string, 0, len(info.DefaultPluginTasks))
for _, taskName := range info.DefaultPluginTasks {
cleanedDefaultTasks = append(cleanedDefaultTasks, cleanString(taskName))
}
cleanedPluginName := cleanString(pluginName)
enabledPlugins[cleanedPluginName] = EnabledPlugins{
DefaultPluginTasks: cleanedDefaultTasks,
}
}
return s
return enabledPlugins
}

func GetConfig() *Config {
Expand Down
15 changes: 12 additions & 3 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,19 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
if err != nil {
return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID)
}
println(fmt.Sprintf("for plugin [%s], registered task types: [%+v] and default task types [%+v]",
p.ID, p.RegisteredTaskTypes, p.DefaultForTaskTypes))
for _, tt := range p.RegisteredTaskTypes {
logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt)
// TODO(katrogan): Make the default task plugin assignment more explicit (https://github.com/lyft/flyte/issues/516)
t.defaultPlugins[tt] = cp
for _, defaultTaskType := range p.DefaultForTaskTypes {
if defaultTaskType == tt {
if existingHandler, alreadyDefaulted := t.defaultPlugins[tt]; alreadyDefaulted {
logger.Warnf(ctx, "TaskType [%s] has multiple default handlers specified: [%s] and [%s]",
tt, existingHandler.GetID(), cp.GetID())
}
logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt)
t.defaultPlugins[tt] = cp
}
}

pluginsForTaskType, ok := t.pluginsForType[tt]
if !ok {
Expand Down
38 changes: 29 additions & 9 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

Expand All @@ -28,7 +30,6 @@ import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks"
pluginK8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -150,41 +151,59 @@ func Test_task_Setup(t *testing.T) {
defaultPluginID string
}
tests := []struct {
name string
registry PluginRegistryIface
fields wantFields
wantErr bool
name string
registry PluginRegistryIface
enabledPluginsConfig map[string]config.EnabledPlugins
fields wantFields
wantErr bool
}{
{"no-plugins", testPluginRegistry{}, wantFields{}, false},
{"no-plugins", testPluginRegistry{}, map[string]config.EnabledPlugins{}, wantFields{}, false},
{"no-default-only-core", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{},
}, map[string]config.EnabledPlugins{
corePluginType: {DefaultPluginTasks: []string{corePluginType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType},
}, false},
{"no-default-only-k8s", testPluginRegistry{
core: []pluginCore.PluginEntry{}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
}, map[string]config.EnabledPlugins{
k8sPluginType: {DefaultPluginTasks: []string{k8sPluginType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{k8sPluginType: k8sPluginType},
}, false},
{"no-default", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
{"no-default", testPluginRegistry{}, map[string]config.EnabledPlugins{
corePluginType: {DefaultPluginTasks: []string{corePluginType}},
k8sPluginType: {DefaultPluginTasks: []string{k8sPluginType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginType: k8sPluginType},
pluginIDs: map[pluginCore.TaskType]string{},
}, false},
{"only-default-core", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
}, map[string]config.EnabledPlugins{
corePluginType: {DefaultPluginTasks: []string{corePluginType}},
corePluginDefaultType: {DefaultPluginTasks: []string{corePluginDefaultType}},
k8sPluginType: {DefaultPluginTasks: []string{k8sPluginType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType},
defaultPluginID: corePluginDefaultType,
}, false},
{"only-default-k8s", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntryDefault},
}, map[string]config.EnabledPlugins{
corePluginType: {DefaultPluginTasks: []string{corePluginType}},
k8sPluginDefaultType: {DefaultPluginTasks: []string{k8sPluginDefaultType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginDefaultType: k8sPluginDefaultType},
defaultPluginID: k8sPluginDefaultType,
}, false},
{"default-both", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry, k8sPluginEntryDefault},
}, map[string]config.EnabledPlugins{
corePluginType: {DefaultPluginTasks: []string{corePluginType}},
corePluginDefaultType: {DefaultPluginTasks: []string{corePluginDefaultType}},
k8sPluginType: {DefaultPluginTasks: []string{k8sPluginType}},
k8sPluginDefaultType: {DefaultPluginTasks: []string{k8sPluginDefaultType}},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType, k8sPluginDefaultType: k8sPluginDefaultType},
defaultPluginID: corePluginDefaultType,
Expand All @@ -200,6 +219,7 @@ func Test_task_Setup(t *testing.T) {
sCtx.On("MetricsScope").Return(promutils.NewTestScope())

tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, promutils.NewTestScope())
tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPluginsConfig
assert.NoError(t, err)
tk.pluginRegistry = tt.registry
if err := tk.Setup(context.TODO(), sCtx); err != nil {
Expand Down
19 changes: 11 additions & 8 deletions pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,32 @@ import (

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/logger"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/k8s"
)

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) {
allPluginsEnabled := false
enabledPlugins := sets.NewString()
enabledPlugins := make(map[string]config.EnabledPlugins)
if cfg != nil {
enabledPlugins = cfg.GetEnabledPluginsSet()
enabledPlugins = cfg.GetEnabledPlugins()
}
if enabledPlugins.Len() == 0 {
if len(enabledPlugins) == 0 {
allPluginsEnabled = true
}

var finalizedPlugins []core.PluginEntry
logger.Infof(ctx, "Enabled plugins: %v", enabledPlugins.List())
logger.Infof(ctx, "Enabled plugins: %+v", enabledPlugins)
logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled)
for _, cpe := range pr.GetCorePlugins() {
id := strings.ToLower(cpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
pluginCfg, pluginEnabled := enabledPlugins[id]
if !allPluginsEnabled && !pluginEnabled {
logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "Plugin [%s] ENABLED", id)
cpe.DefaultForTaskTypes = pluginCfg.DefaultPluginTasks
finalizedPlugins = append(finalizedPlugins, cpe)
}
}
Expand All @@ -47,7 +48,8 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
for i := range k8sPlugins {
kpe := k8sPlugins[i]
id := strings.ToLower(kpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
pluginConfig, pluginEnabled := enabledPlugins[id]
if !allPluginsEnabled && !pluginEnabled {
logger.Infof(ctx, "K8s Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "K8s Plugin [%s] is ENABLED.", id)
Expand All @@ -57,7 +59,8 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) {
return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex)
},
IsDefault: kpe.IsDefault,
IsDefault: kpe.IsDefault,
DefaultForTaskTypes: pluginConfig.DefaultPluginTasks,
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/task/plugin_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) {
args args
want want
}{
{"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}},
{"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: map[string]config.EnabledPlugins{coreContainer: {}}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}},
{"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}},
{"no-config-no-plugins", args{}, want{}},
{"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: map[string]config.EnabledPlugins{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: map[string]config.EnabledPlugins{coreContainer: {DefaultPluginTasks: []string{"container"}}, k8sOther: {}}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: map[string]config.EnabledPlugins{strings.ToUpper(coreContainer): {DefaultPluginTasks: []string{"container"}}, strings.ToUpper(k8sOther): {}}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down