Skip to content

Commit

Permalink
Merge pull request #233 from wy-lucky/feat/not-collect-vnode-resource
Browse files Browse the repository at this point in the history
feat(fcluster): do not collect resources related to virtual nodes.
  • Loading branch information
limhawjia authored Sep 21, 2023
2 parents cafc3db + 4ef5681 commit 17fbfdd
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 18 deletions.
1 change: 1 addition & 0 deletions cmd/controller-manager/app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func startFederatedClusterController(
controllerCtx.ComponentConfig.ClusterJoinTimeout,
controllerCtx.WorkerCount,
controllerCtx.FedSystemNamespace,
controllerCtx.ComponentConfig.ResourceAggregationNodeFilter,
)
if err != nil {
return nil, fmt.Errorf("error creating federate controller: %w", err)
Expand Down
13 changes: 12 additions & 1 deletion cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Options struct {
PrometheusAddr string
PrometheusPort uint16
PrometheusQuantiles map[string]string

ResourceAggregationNodeFilter []string
}

func NewOptions() *Options {
Expand Down Expand Up @@ -140,7 +142,16 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl
&o.PrometheusQuantiles,
"prometheus-quantiles",
map[string]string{"0.5": "0.01", "0.95": "0.01", "0.99": "0.002"},
"prometheus summary objective quantiles",
"Prometheus summary objective quantiles",
)

flags.StringArrayVar(
&o.ResourceAggregationNodeFilter,
"resource-aggregation-node-filter",
[]string{},
"Nodes matching the provided label selector are excluded from resource aggregation. "+
"If the flag is provided multiple times, "+
"nodes are excluded as long as at least one of the selectors is matched.",
)
}

Expand Down
11 changes: 11 additions & 0 deletions cmd/controller-manager/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
Expand Down Expand Up @@ -192,5 +193,15 @@ func getComponentConfig(opts *options.Options) (*controllercontext.ComponentConf
componentConfig.NSAutoPropExcludeRegexp = nsAutoPropExcludeRegexp
}

labelSelectors := make([]labels.Selector, len(opts.ResourceAggregationNodeFilter))
for i, labelSelectorString := range opts.ResourceAggregationNodeFilter {
labelSelector, err := labels.Parse(labelSelectorString)
if err != nil {
return nil, fmt.Errorf("failed to compile resource aggregation node filter: %w", err)
}
labelSelectors[i] = labelSelector
}
componentConfig.ResourceAggregationNodeFilter = labelSelectors

return componentConfig, nil
}
8 changes: 5 additions & 3 deletions pkg/controllers/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"regexp"
"time"

"k8s.io/apimachinery/pkg/labels"
dynamicclient "k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
kubeinformer "k8s.io/client-go/informers"
Expand Down Expand Up @@ -78,7 +79,8 @@ func (c *Context) StartFactories(ctx context.Context) {
}

type ComponentConfig struct {
NSAutoPropExcludeRegexp *regexp.Regexp
ClusterJoinTimeout time.Duration
MemberObjectEnqueueDelay time.Duration
NSAutoPropExcludeRegexp *regexp.Regexp
ClusterJoinTimeout time.Duration
MemberObjectEnqueueDelay time.Duration
ResourceAggregationNodeFilter []labels.Selector
}
8 changes: 4 additions & 4 deletions pkg/controllers/federatedcluster/clusterstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *FederatedClusterController) collectIndividualClusterStatus(

// We skip updating cluster resources and api resources if cluster is not ready
if readyStatus == corev1.ConditionTrue {
if err := updateClusterResources(
if err := c.updateClusterResources(
ctx,
&cluster.Status,
podLister,
Expand Down Expand Up @@ -200,7 +200,7 @@ func checkReadyByHealthz(
return corev1.ConditionFalse, clusterReadyStatus
}

func updateClusterResources(
func (c *FederatedClusterController) updateClusterResources(
ctx context.Context,
clusterStatus *fedcorev1a1.FederatedClusterStatus,
podLister corev1listers.PodLister,
Expand All @@ -225,12 +225,12 @@ func updateClusterResources(

schedulableNodes := int64(0)
for _, node := range nodes {
if isNodeSchedulable(node) {
if isNodeSchedulable(node) && !c.isNodeFiltered(node) {
schedulableNodes++
}
}

allocatable, available := aggregateResources(nodes, pods)
allocatable, available := c.aggregateResources(nodes, pods)
clusterStatus.Resources = fedcorev1a1.Resources{
SchedulableNodes: &schedulableNodes,
Allocatable: allocatable,
Expand Down
15 changes: 9 additions & 6 deletions pkg/controllers/federatedcluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,10 @@ type FederatedClusterController struct {
kubeClient kubeclient.Interface
fedClient fedclient.Interface

fedSystemNamespace string
clusterHealthCheckConfig *ClusterHealthCheckConfig
clusterJoinTimeout time.Duration
fedSystemNamespace string
clusterHealthCheckConfig *ClusterHealthCheckConfig
clusterJoinTimeout time.Duration
resourceAggregationNodeFilter []labels.Selector

worker worker.ReconcileWorker[common.QualifiedName]
statusCollectWorker worker.ReconcileWorker[common.QualifiedName]
Expand All @@ -98,6 +99,7 @@ func NewFederatedClusterController(
clusterJoinTimeout time.Duration,
workerCount int,
fedSystemNamespace string,
resourceAggregationNodeFilter []labels.Selector,
) (*FederatedClusterController, error) {
c := &FederatedClusterController{
clusterInformer: clusterInformer,
Expand All @@ -109,9 +111,10 @@ func NewFederatedClusterController(
// TODO: make health check period configurable
Period: time.Second * 30,
},
clusterJoinTimeout: clusterJoinTimeout,
metrics: metrics,
logger: logger.WithValues("controller", FederatedClusterControllerName),
clusterJoinTimeout: clusterJoinTimeout,
resourceAggregationNodeFilter: resourceAggregationNodeFilter,
metrics: metrics,
logger: logger.WithValues("controller", FederatedClusterControllerName),
}

broadcaster := record.NewBroadcaster()
Expand Down
21 changes: 19 additions & 2 deletions pkg/controllers/federatedcluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package federatedcluster
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/util/resource"
Expand Down Expand Up @@ -132,16 +133,32 @@ func isNodeSchedulable(node *corev1.Node) bool {
return true
}

// isNodeFiltered returns true if the node is filtered to aggregate resources, otherwise false.
func (c *FederatedClusterController) isNodeFiltered(node *corev1.Node) bool {
nodeLabels := node.GetLabels()
if nodeLabels == nil {
return false
}

for _, labelSelector := range c.resourceAggregationNodeFilter {
if labelSelector.Matches(labels.Set(nodeLabels)) {
return true
}
}

return false
}

// aggregateResources returns
// - allocatable resources from the nodes and,
// - available resources after considering allocations to the given pods.
func aggregateResources(
func (c *FederatedClusterController) aggregateResources(
nodes []*corev1.Node,
pods []*corev1.Pod,
) (corev1.ResourceList, corev1.ResourceList) {
allocatable := make(corev1.ResourceList)
for _, node := range nodes {
if !isNodeSchedulable(node) {
if !isNodeSchedulable(node) || c.isNodeFiltered(node) {
continue
}

Expand Down
73 changes: 72 additions & 1 deletion pkg/controllers/federatedcluster/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/davecgh/go-spew/spew"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

func Test_aggregateResources(t *testing.T) {
Expand Down Expand Up @@ -256,11 +258,80 @@ func Test_aggregateResources(t *testing.T) {
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
{
name: "one container per pod, and filtering some nodes",
nodes: []*corev1.Node{
{
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"type": "virtual-kubelet",
},
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
},
},
},
pods: []*corev1.Pod{
{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
},
},
},
{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
},
},
},
},
},
},
expectedAllocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
},
expectedAvailable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("0"),
corev1.ResourceMemory: resource.MustParse("0Gi"),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
allocatable, available := aggregateResources(tc.nodes, tc.pods)
selector, _ := labels.Parse("type=virtual-kubelet")
c := &FederatedClusterController{
resourceAggregationNodeFilter: []labels.Selector{selector},
}

allocatable, available := c.aggregateResources(tc.nodes, tc.pods)
if len(allocatable) != len(tc.expectedAllocatable) {
t.Fatalf("expected allocatable %s differs from actual allocatable %s", spew.Sdump(tc.expectedAllocatable), spew.Sdump(allocatable))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/cascadingdeletion/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
)

// AnnotationCascadingDelete on a fedreated cluster means that
// AnnotationCascadingDelete on a federated cluster means that
// resources managed by KubeAdmiral in the cluster should be cleaned
// up before deletion can occur.
var AnnotationCascadingDelete = common.DefaultPrefix + "cascading-delete"
Expand Down

0 comments on commit 17fbfdd

Please sign in to comment.