Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add loadaware scheduler plugin and npd controller plugin #654

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cmd/katalyst-controller/app/controller/npd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
"context"
"fmt"

"k8s.io/klog/v2"

katalyst "github.com/kubewharf/katalyst-core/cmd/base"
"github.com/kubewharf/katalyst-core/pkg/config"
"github.com/kubewharf/katalyst-core/pkg/controller/npd"
)

const (
NPDControllerName = "npd"
)

func StartNPDController(ctx context.Context, controlCtx *katalyst.GenericContext,
conf *config.Configuration, extraConf interface{}, _ string,
) (bool, error) {
if controlCtx == nil || conf == nil {
err := fmt.Errorf("controlCtx and controllerConf can't be nil")
klog.Error(err)
return false, err
}

npdController, err := npd.NewNPDController(
ctx,
controlCtx, conf.GenericConfiguration,
conf.GenericControllerConfiguration,
conf.NPDConfig,
extraConf,
)
if err != nil {
klog.Errorf("failed to new npd controller")
return false, err
}

go npdController.Run()
return true, nil
}
1 change: 1 addition & 0 deletions cmd/katalyst-controller/app/enablecontrollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func init() {
controllerInitializers.Store(controller.VPAControllerName, ControllerStarter{Starter: controller.StartVPAController})
controllerInitializers.Store(controller.KCCControllerName, ControllerStarter{Starter: controller.StartKCCController})
controllerInitializers.Store(controller.SPDControllerName, ControllerStarter{Starter: controller.StartSPDController})
controllerInitializers.Store(controller.NPDControllerName, ControllerStarter{Starter: controller.StartNPDController})
controllerInitializers.Store(controller.LifeCycleControllerName, ControllerStarter{Starter: controller.StartLifeCycleController})
controllerInitializers.Store(controller.MonitorControllerName, ControllerStarter{Starter: controller.StartMonitorController})
controllerInitializers.Store(controller.OvercommitControllerName, ControllerStarter{Starter: controller.StartOvercommitController})
Expand Down
4 changes: 4 additions & 0 deletions cmd/katalyst-controller/app/options/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ControllersOptions struct {
*VPAOptions
*KCCOptions
*SPDOptions
*NPDOptions
*LifeCycleOptions
*MonitorOptions
*OvercommitOptions
Expand All @@ -40,6 +41,7 @@ func NewControllersOptions() *ControllersOptions {
VPAOptions: NewVPAOptions(),
KCCOptions: NewKCCOptions(),
SPDOptions: NewSPDOptions(),
NPDOptions: NewNPDOptions(),
LifeCycleOptions: NewLifeCycleOptions(),
MonitorOptions: NewMonitorOptions(),
OvercommitOptions: NewOvercommitOptions(),
Expand All @@ -52,6 +54,7 @@ func (o *ControllersOptions) AddFlags(fss *cliflag.NamedFlagSets) {
o.VPAOptions.AddFlags(fss)
o.KCCOptions.AddFlags(fss)
o.SPDOptions.AddFlags(fss)
o.NPDOptions.AddFlags(fss)
o.LifeCycleOptions.AddFlags(fss)
o.MonitorOptions.AddFlags(fss)
o.OvercommitOptions.AddFlags(fss)
Expand All @@ -66,6 +69,7 @@ func (o *ControllersOptions) ApplyTo(c *controllerconfig.ControllersConfiguratio
errList = append(errList, o.VPAOptions.ApplyTo(c.VPAConfig))
errList = append(errList, o.KCCOptions.ApplyTo(c.KCCConfig))
errList = append(errList, o.SPDOptions.ApplyTo(c.SPDConfig))
errList = append(errList, o.NPDOptions.ApplyTo(c.NPDConfig))
errList = append(errList, o.LifeCycleOptions.ApplyTo(c.LifeCycleConfig))
errList = append(errList, o.MonitorOptions.ApplyTo(c.MonitorConfig))
errList = append(errList, o.OvercommitOptions.ApplyTo(c.OvercommitConfig))
Expand Down
120 changes: 120 additions & 0 deletions cmd/katalyst-controller/app/options/npd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"time"

cliflag "k8s.io/component-base/cli/flag"

"github.com/kubewharf/katalyst-core/pkg/config/controller"
)

type NPDOptions struct {
NPDIndicatorPlugins []string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPDMetricsPlugins

EnableScopeDuplicated bool
SyncWorkers int
*LoadAwarePluginOptions
}

func NewNPDOptions() *NPDOptions {
return &NPDOptions{
NPDIndicatorPlugins: []string{},
EnableScopeDuplicated: false,
SyncWorkers: 1,
LoadAwarePluginOptions: NewLoadAwarePluginOptions(),
}
}

func (o *NPDOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("npd")

fs.StringSliceVar(&o.NPDIndicatorPlugins, "npd-indicator-plugins", o.NPDIndicatorPlugins,
"A list of indicator plugins to be used")
fs.BoolVar(&o.EnableScopeDuplicated, "npd-enable-scope-duplicated", o.EnableScopeDuplicated,
"Whether metrics with the same scope can be updated by multiple plugins")
fs.IntVar(&o.SyncWorkers, "npd-sync-workers", o.SyncWorkers,
"Number of workers to sync npd status")

fs.IntVar(&o.Workers, "loadaware-sync-workers", o.Workers,
"num of workers to sync node metrics")
fs.DurationVar(&o.SyncMetricInterval, "loadaware-sync-interval", o.SyncMetricInterval,
"interval of syncing node metrics")
fs.DurationVar(&o.ListMetricTimeout, "loadaware-list-metric-timeout", o.ListMetricTimeout,
"timeout duration when list metrics from metrics server")

fs.StringVar(&o.PodUsageSelectorNamespace, "loadaware-podusage-selector-namespace", o.PodUsageSelectorNamespace,
"pod namespace used to detect whether podusage should be calculated")
fs.StringVar(&o.PodUsageSelectorKey, "loadaware-podusage-selector-key", o.PodUsageSelectorKey,
"pod label key used to detect whether podusage should be calculated")
fs.StringVar(&o.PodUsageSelectorVal, "loadaware-podusage-selector-val", o.PodUsageSelectorVal,
"pod label value used to detect whether podusage should be calculated")
fs.IntVar(&o.MaxPodUsageCount, "loadaware-max-podusage-count", o.MaxPodUsageCount,
"max podusage count on nodemonitor")
}

func (o *NPDOptions) ApplyTo(c *controller.NPDConfig) error {
c.NPDIndicatorPlugins = o.NPDIndicatorPlugins
c.EnableScopeDuplicated = o.EnableScopeDuplicated
c.SyncWorkers = o.SyncWorkers

c.Workers = o.Workers
c.SyncMetricInterval = o.SyncMetricInterval
c.ListMetricTimeout = o.ListMetricTimeout
c.PodUsageSelectorNamespace = o.PodUsageSelectorNamespace
c.PodUsageSelectorKey = o.PodUsageSelectorKey
c.PodUsageSelectorVal = o.PodUsageSelectorVal
c.MaxPodUsageCount = o.MaxPodUsageCount
return nil
}

func (o *NPDOptions) Config() (*controller.NPDConfig, error) {
c := &controller.NPDConfig{}
if err := o.ApplyTo(c); err != nil {
return nil, err
}

return c, nil
}

type LoadAwarePluginOptions struct {
// number of workers to sync node metrics
Workers int
// time interval of sync node metrics
SyncMetricInterval time.Duration
// timeout of list metrics from apiserver
ListMetricTimeout time.Duration

// pod selector for checking if pod usage is required
PodUsageSelectorNamespace string
PodUsageSelectorKey string
PodUsageSelectorVal string

MaxPodUsageCount int
}

func NewLoadAwarePluginOptions() *LoadAwarePluginOptions {
return &LoadAwarePluginOptions{
Workers: 3,
SyncMetricInterval: time.Minute * 1,
ListMetricTimeout: time.Second * 10,
PodUsageSelectorNamespace: "",
PodUsageSelectorKey: "",
PodUsageSelectorVal: "",
MaxPodUsageCount: 20,
}
}
2 changes: 2 additions & 0 deletions cmd/katalyst-scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/component-base/logs"

"github.com/kubewharf/katalyst-core/cmd/katalyst-scheduler/app"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/loadaware"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/nodeovercommitment"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/noderesourcetopology"
"github.com/kubewharf/katalyst-core/pkg/scheduler/plugins/qosawarenoderesources"
Expand All @@ -40,6 +41,7 @@ func main() {
app.WithPlugin(qosawarenoderesources.BalancedAllocationName, qosawarenoderesources.NewBalancedAllocation),
app.WithPlugin(noderesourcetopology.TopologyMatchName, noderesourcetopology.New),
app.WithPlugin(nodeovercommitment.Name, nodeovercommitment.New),
app.WithPlugin(loadaware.Name, loadaware.NewPlugin),
)

if err := runCommand(command); err != nil {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ require (
)

replace (
github.com/kubewharf/katalyst-api => github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c
k8s.io/api => k8s.io/api v0.24.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.24.6
k8s.io/apimachinery => k8s.io/apimachinery v0.24.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=
github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c h1:/0fwVknrQEJoRKnT2H0f5xkzCdcDIH4qfNvpPn7QoH8=
github.com/WangZzzhe/katalyst-api v0.0.0-20240719035252-ac200da4db6c/go.mod h1:HHUJnOrDN5xrzKhEspq70ZJL859b09j07pMAl9ACnwU=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -568,8 +570,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d h1:6CuK3axf2B63zIkEu5XyxbaC+JArE/3Jo3QHvb+Hn0M=
github.com/kubewharf/katalyst-api v0.5.1-0.20240702044746-be552fd7ea7d/go.mod h1:Y2IeIorxQamF2a3oa0+URztl5QCSty6Jj3zD83R8J9k=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8 h1:2e89T/nZTgzaVhyRsZuwEdRk8V8kJXs4PRkgfeG4Ai4=
github.com/kubewharf/kubelet v1.24.6-kubewharf.8/go.mod h1:MxbSZUx3wXztFneeelwWWlX7NAAStJ6expqq7gY2J3c=
github.com/kyoh86/exportloopref v0.1.7/go.mod h1:h1rDl2Kdj97+Kwh4gdz3ujE7XHmH51Q0lUiZ1z4NLj8=
Expand Down
77 changes: 77 additions & 0 deletions pkg/client/control/npd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2022 The Katalyst Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package control

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1"
clientset "github.com/kubewharf/katalyst-api/pkg/client/clientset/versioned"
)

type NodeProfileControl interface {
CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error)
UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error)
DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error
}

type DummyNPDControl struct{}

func (d *DummyNPDControl) CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
return nil, nil
}

func (d *DummyNPDControl) UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
return nil, nil
}

func (d *DummyNPDControl) DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error {
return nil
}

type NPDControlImp struct {
client clientset.Interface
}

func NewNPDControlImp(client clientset.Interface) *NPDControlImp {
return &NPDControlImp{
client: client,
}
}

func (n *NPDControlImp) CreateNPD(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.CreateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
if npd == nil {
return nil, fmt.Errorf("npd is nil")
}

return n.client.NodeV1alpha1().NodeProfileDescriptors().Create(ctx, npd, opts)
}

func (n *NPDControlImp) UpdateNPDStatus(ctx context.Context, npd *v1alpha1.NodeProfileDescriptor, opts metav1.UpdateOptions) (*v1alpha1.NodeProfileDescriptor, error) {
if npd == nil {
return nil, fmt.Errorf("npd is nil")
}

return n.client.NodeV1alpha1().NodeProfileDescriptors().UpdateStatus(ctx, npd, opts)
}

func (n *NPDControlImp) DeleteNPD(ctx context.Context, npdName string, opts metav1.DeleteOptions) error {
return n.client.NodeV1alpha1().NodeProfileDescriptors().Delete(ctx, npdName, opts)
}
8 changes: 8 additions & 0 deletions pkg/client/genericclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
aggregator "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/metrics/pkg/client/clientset/versioned"
"k8s.io/metrics/pkg/client/custom_metrics"
customclient "k8s.io/metrics/pkg/client/custom_metrics"
cmfake "k8s.io/metrics/pkg/client/custom_metrics/fake"
Expand All @@ -52,6 +53,7 @@ type GenericClientSet struct {
DynamicClient dynamic.Interface
DiscoveryClient discovery.DiscoveryInterface
AggregatorClient aggregator.Interface
MetricClient versioned.Interface

CustomClient customclient.CustomMetricsClient
ExternalClient externalclient.ExternalMetricsClient
Expand Down Expand Up @@ -97,13 +99,19 @@ func newForConfig(cfg *rest.Config) (*GenericClientSet, error) {
return nil, err
}

metricClient, err := versioned.NewForConfig(cfg)
if err != nil {
return nil, err
}

return &GenericClientSet{
cfg: cfg,
MetaClient: metaClient,
KubeClient: kubeClient,
InternalClient: internalClient,
DynamicClient: dynamicClient,
DiscoveryClient: discoveryClient,
MetricClient: metricClient,

CustomClient: &cmfake.FakeCustomMetricsClient{},
ExternalClient: &emfake.FakeExternalMetricsClient{},
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/controller/controller_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type ControllersConfiguration struct {
*VPAConfig
*KCCConfig
*SPDConfig
*NPDConfig
*LifeCycleConfig
*MonitorConfig
*OvercommitConfig
Expand All @@ -63,6 +64,7 @@ func NewControllersConfiguration() *ControllersConfiguration {
VPAConfig: NewVPAConfig(),
KCCConfig: NewKCCConfig(),
SPDConfig: NewSPDConfig(),
NPDConfig: NewNPDConfig(),
LifeCycleConfig: NewLifeCycleConfig(),
MonitorConfig: NewMonitorConfig(),
OvercommitConfig: NewOvercommitConfig(),
Expand Down
Loading
Loading