Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ tasks:
- container
- K8S-ARRAY
- qubole-hive-executor
default-for-task-type:
- container-array: k8s-array
- presto: my-presto
# Uncomment to enable sagemaker plugin
# - sagemaker_training
# - sagemaker_hyperparameter_tuning
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/Azure/go-autorest/autorest v0.10.0 // indirect
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/coreos/etcd v3.3.15+incompatible // indirect
github.com/coreos/go-oidc v2.2.1+incompatible // indirect
github.com/fatih/color v1.9.0
github.com/ghodss/yaml v1.0.0
Expand All @@ -23,11 +22,10 @@ require (
github.com/imdario/mergo v0.3.8 // indirect
github.com/lyft/datacatalog v0.2.1
github.com/lyft/flyteidl v0.18.9
github.com/lyft/flyteplugins v0.5.12
github.com/lyft/flyteplugins v0.5.14
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
github.com/mattn/go-colorable v0.1.6 // indirect
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/ncw/swift v1.0.50 // indirect
github.com/pkg/errors v0.9.1
Expand All @@ -49,7 +47,6 @@ require (
k8s.io/kube-openapi v0.0.0-20200204173128-addea2498afe // indirect
k8s.io/utils v0.0.0-20200229041039-0a110f9eb7ab // indirect
sigs.k8s.io/controller-runtime v0.5.1
sigs.k8s.io/testing_frameworks v0.1.2 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

Expand Down
45 changes: 4 additions & 41 deletions go.sum

Large diffs are not rendered by default.

38 changes: 28 additions & 10 deletions pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/lyft/flytestdlib/config"
"k8s.io/apimachinery/pkg/util/sets"
)

//go:generate pflags Config --default-var defaultConfig
Expand All @@ -14,7 +13,7 @@ const SectionKey = "tasks"

var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}},
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
MaxPluginPhaseVersions: 100000,
BarrierConfig: BarrierConfig{
Enabled: true,
Expand Down Expand Up @@ -46,22 +45,41 @@ type BarrierConfig struct {
}

type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
EnabledPlugins []string `json:"enabled-plugins" pflag:",deprecated"`
// Maps task types to their plugin handler (by ID).
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
}

type BackOffConfig struct {
BaseSecond int `json:"base-second" pflag:",The number of seconds representing the base duration of the exponential backoff"`
MaxDuration config.Duration `json:"max-duration" pflag:",The cap of the backoff duration"`
}

func (p TaskPluginConfig) GetEnabledPluginsSet() sets.String {
s := sets.NewString()
for _, e := range p.EnabledPlugins {
cleanedPluginName := strings.Trim(e, " ")
cleanedPluginName = strings.ToLower(cleanedPluginName)
s.Insert(cleanedPluginName)
type PluginConfig struct {
DefaultForTaskTypes []string
}

func cleanString(source string) string {
cleaned := strings.Trim(source, " ")
cleaned = strings.ToLower(cleaned)
return cleaned
}

func (p TaskPluginConfig) GetEnabledPlugins() map[string]PluginConfig {
enabledPlugins := make(map[string]PluginConfig)
pluginDefaultForTaskType := map[string][]string{}
// Reverse the map. Having the config use task type as a key guarantees only one default plugin can be specified per
// task type but now we need to sort for which tasks a plugin needs to be the default.
for taskName, pluginName := range p.DefaultForTaskTypes {
pluginDefaultForTaskType[pluginName] = append(pluginDefaultForTaskType[pluginName], cleanString(taskName))
}
for _, pluginName := range p.EnabledPlugins {
cleanedPluginName := cleanString(pluginName)
enabledPlugins[cleanedPluginName] = PluginConfig{
DefaultForTaskTypes: pluginDefaultForTaskType[pluginName],
}
}
return s
return enabledPlugins
}

func GetConfig() *Config {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodes/task/config/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 34 additions & 3 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
return err
}

// Not every task type will have a default plugin specified in the flytepropeller config.
// That's fine, we resort to using the plugins' static RegisteredTaskTypes as a fallback.
fallbackTaskHandlerMap := make(map[string]map[string]pluginCore.Plugin)

for _, p := range enabledPlugins {
// create a new resource registrar proxy for each plugin, and pass it into the plugin's LoadPlugin() via a setup context
pluginResourceNamespacePrefix := pluginCore.ResourceNamespace(newResourceManagerBuilder.GetID()).CreateSubNamespace(pluginCore.ResourceNamespace(p.ID))
Expand All @@ -216,16 +220,30 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
return regErrors.Wrapf(err, "failed to load plugin - %s", p.ID)
}
for _, tt := range p.RegisteredTaskTypes {
logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt)
// TODO(katrogan): Make the default task plugin assignment more explicit (https://github.com/lyft/flyte/issues/516)
t.defaultPlugins[tt] = cp
for _, defaultTaskType := range p.DefaultForTaskTypes {
if defaultTaskType == tt {
if existingHandler, alreadyDefaulted := t.defaultPlugins[tt]; alreadyDefaulted && existingHandler.GetID() != cp.GetID() {
logger.Panicf(ctx, "TaskType [%s] has multiple default handlers specified: [%s] and [%s]",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you can just return an error... and let the top layer panic... I generally think panic should happen at the very top layer of any application (specifically the top most layer that cannot propagate errors any further) and everywhere else we should just propagate errors (Must*-style functions is a sort-of an exception)

tt, existingHandler.GetID(), cp.GetID())
}
logger.Infof(ctx, "Plugin [%s] registered for TaskType [%s]", cp.GetID(), tt)
t.defaultPlugins[tt] = cp
}
}

pluginsForTaskType, ok := t.pluginsForType[tt]
if !ok {
pluginsForTaskType = make(map[pluginID]pluginCore.Plugin)
}
pluginsForTaskType[cp.GetID()] = cp
t.pluginsForType[tt] = pluginsForTaskType

fallbackMap, ok := fallbackTaskHandlerMap[tt]
if !ok {
fallbackMap = make(map[string]pluginCore.Plugin)
}
fallbackMap[cp.GetID()] = cp
fallbackTaskHandlerMap[tt] = fallbackMap
}
if p.IsDefault {
if err := t.setDefault(ctx, cp); err != nil {
Expand All @@ -234,6 +252,19 @@ func (t *Handler) Setup(ctx context.Context, sCtx handler.SetupContext) error {
}
}

// Read from the fallback task handler map for any remaining tasks without a defaultPlugins registered handler.
for taskType, registeredPlugins := range fallbackTaskHandlerMap {
if _, ok := t.defaultPlugins[taskType]; ok {
break
}
if len(registeredPlugins) != 1 {
logger.Panicf(ctx, "Multiple plugins registered to handle task type: %s. ([%+v])", taskType, registeredPlugins)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

}
for _, plugin := range registeredPlugins {
t.defaultPlugins[taskType] = plugin
}
}

rm, err := newResourceManagerBuilder.BuildResourceManager(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to build a resource manager")
Expand Down
52 changes: 37 additions & 15 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"

Expand All @@ -28,7 +30,6 @@ import (
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
ioMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks"
pluginK8s "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
pluginK8sMocks "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s/mocks"
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -150,41 +151,60 @@ func Test_task_Setup(t *testing.T) {
defaultPluginID string
}
tests := []struct {
name string
registry PluginRegistryIface
fields wantFields
wantErr bool
name string
registry PluginRegistryIface
enabledPlugins []string
defaultForTaskTypes map[string]string
fields wantFields
wantErr bool
}{
{"no-plugins", testPluginRegistry{}, wantFields{}, false},
{"no-plugins", testPluginRegistry{}, []string{}, map[string]string{}, wantFields{}, false},
{"no-default-only-core", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType},
}, false},
}, []string{corePluginType}, map[string]string{
corePluginType: corePluginType},
wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType},
}, false},
{"no-default-only-k8s", testPluginRegistry{
core: []pluginCore.PluginEntry{}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
}, []string{k8sPluginType}, map[string]string{
k8sPluginType: k8sPluginType},
wantFields{
pluginIDs: map[pluginCore.TaskType]string{k8sPluginType: k8sPluginType},
}, false},
{"no-default", testPluginRegistry{}, []string{corePluginType, k8sPluginType}, map[string]string{
corePluginType: corePluginType,
k8sPluginType: k8sPluginType,
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{k8sPluginType: k8sPluginType},
}, false},
{"no-default", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginType: k8sPluginType},
pluginIDs: map[pluginCore.TaskType]string{},
}, false},
{"only-default-core", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry},
}, []string{corePluginType, corePluginDefaultType, k8sPluginType}, map[string]string{
corePluginType: corePluginType,
corePluginDefaultType: corePluginDefaultType,
k8sPluginType: k8sPluginType,
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType},
defaultPluginID: corePluginDefaultType,
}, false},
{"only-default-k8s", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry}, k8s: []pluginK8s.PluginEntry{k8sPluginEntryDefault},
}, []string{corePluginType, k8sPluginDefaultType}, map[string]string{
corePluginType: corePluginType,
k8sPluginDefaultType: k8sPluginDefaultType,
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, k8sPluginDefaultType: k8sPluginDefaultType},
defaultPluginID: k8sPluginDefaultType,
}, false},
{"default-both", testPluginRegistry{
core: []pluginCore.PluginEntry{corePluginEntry, corePluginEntryDefault}, k8s: []pluginK8s.PluginEntry{k8sPluginEntry, k8sPluginEntryDefault},
}, []string{corePluginType, corePluginDefaultType, k8sPluginType, k8sPluginDefaultType}, map[string]string{
corePluginType: corePluginType,
corePluginDefaultType: corePluginDefaultType,
k8sPluginType: k8sPluginType,
k8sPluginDefaultType: k8sPluginDefaultType,
}, wantFields{
pluginIDs: map[pluginCore.TaskType]string{corePluginType: corePluginType, corePluginDefaultType: corePluginDefaultType, k8sPluginType: k8sPluginType, k8sPluginDefaultType: k8sPluginDefaultType},
defaultPluginID: corePluginDefaultType,
Expand All @@ -200,6 +220,8 @@ func Test_task_Setup(t *testing.T) {
sCtx.On("MetricsScope").Return(promutils.NewTestScope())

tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, promutils.NewTestScope())
tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins
tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes
assert.NoError(t, err)
tk.pluginRegistry = tt.registry
if err := tk.Setup(context.TODO(), sCtx); err != nil {
Expand Down
24 changes: 14 additions & 10 deletions pkg/controller/nodes/task/plugin_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,32 @@ import (

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytestdlib/logger"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/lyft/flytepropeller/pkg/controller/nodes/task/config"
"github.com/lyft/flytepropeller/pkg/controller/nodes/task/k8s"
)

func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) ([]core.PluginEntry, error) {
allPluginsEnabled := false
enabledPlugins := sets.NewString()
enabledPlugins := make(map[string]config.PluginConfig)
if cfg != nil {
enabledPlugins = cfg.GetEnabledPluginsSet()
enabledPlugins = cfg.GetEnabledPlugins()
}
if enabledPlugins.Len() == 0 {
if len(enabledPlugins) == 0 {
allPluginsEnabled = true
}

var finalizedPlugins []core.PluginEntry
logger.Infof(ctx, "Enabled plugins: %v", enabledPlugins.List())
logger.Infof(ctx, "Enabled plugins: %+v", enabledPlugins)
logger.Infof(ctx, "Loading core Plugins, plugin configuration [all plugins enabled: %v]", allPluginsEnabled)
for _, cpe := range pr.GetCorePlugins() {
id := strings.ToLower(cpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
pluginCfg, pluginEnabled := enabledPlugins[id]
if !allPluginsEnabled && !pluginEnabled {
logger.Infof(ctx, "Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "Plugin [%s] ENABLED", id)
cpe.DefaultForTaskTypes = pluginCfg.DefaultForTaskTypes
finalizedPlugins = append(finalizedPlugins, cpe)
}
}
Expand All @@ -47,18 +48,21 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu
for i := range k8sPlugins {
kpe := k8sPlugins[i]
id := strings.ToLower(kpe.ID)
if !allPluginsEnabled && !enabledPlugins.Has(id) {
pluginConfig, pluginEnabled := enabledPlugins[id]
if !allPluginsEnabled && !pluginEnabled {
logger.Infof(ctx, "K8s Plugin [%s] is DISABLED (not found in enabled plugins list).", id)
} else {
logger.Infof(ctx, "K8s Plugin [%s] is ENABLED.", id)
finalizedPlugins = append(finalizedPlugins, core.PluginEntry{
plugin := core.PluginEntry{
ID: id,
RegisteredTaskTypes: kpe.RegisteredTaskTypes,
LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) {
return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex)
},
IsDefault: kpe.IsDefault,
})
IsDefault: kpe.IsDefault,
DefaultForTaskTypes: pluginConfig.DefaultForTaskTypes,
}
finalizedPlugins = append(finalizedPlugins, plugin)
}
}
return finalizedPlugins, nil
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/task/plugin_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) {
{"no-config-no-plugins", args{}, want{}},
{"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}},
{"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
{"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down