Skip to content

Commit 16b39e1

Browse files
authored
Merge pull request #2959 from jwcesign/automated-cherry-pick-of-#2912-upstream-release-1.3
Automated cherry pick of #2912: fix a corner case that re-schedule be skipped in case of the cluster becomes not fit
2 parents 0b802b3 + 2383f4e commit 16b39e1

File tree

6 files changed

+186
-13
lines changed

6 files changed

+186
-13
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
6262
if err != nil {
6363
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
6464
}
65+
66+
// Short path for case no cluster fit.
6567
if len(feasibleClusters) == 0 {
66-
return result, fmt.Errorf("no clusters fit")
68+
return result, framework.ErrNoClusterFit
6769
}
6870
klog.V(4).Infof("feasible clusters found: %v", feasibleClusters)
6971

pkg/scheduler/event_handler.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,13 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
213213
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
214214
fallthrough
215215
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
216-
s.enqueueAffectedPolicy(newCluster)
217-
s.enqueueAffectedClusterPolicy(newCluster)
216+
s.enqueueAffectedPolicy(oldCluster, newCluster)
217+
s.enqueueAffectedClusterPolicy(oldCluster, newCluster)
218218
}
219219
}
220220

221221
// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
222-
func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
222+
func (s *Scheduler) enqueueAffectedPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
223223
policies, _ := s.policyLister.List(labels.Everything())
224224
for _, policy := range policies {
225225
selector := labels.SelectorFromSet(labels.Set{
@@ -229,10 +229,13 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
229229
affinity := policy.Spec.Placement.ClusterAffinity
230230
switch {
231231
case affinity == nil:
232-
// If no clusters specified, add it in queue
232+
// If no clusters specified, add it to the queue
233233
fallthrough
234234
case util.ClusterMatches(newCluster, *affinity):
235-
// If specific cluster matches the affinity. add it in queue
235+
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
236+
fallthrough
237+
case util.ClusterMatches(oldCluster, *affinity):
238+
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
236239
err := s.requeueResourceBindings(selector)
237240
if err != nil {
238241
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
@@ -242,7 +245,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
242245
}
243246

244247
// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
245-
func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) {
248+
func (s *Scheduler) enqueueAffectedClusterPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
246249
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
247250
for _, policy := range clusterPolicies {
248251
selector := labels.SelectorFromSet(labels.Set{
@@ -251,10 +254,13 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
251254
affinity := policy.Spec.Placement.ClusterAffinity
252255
switch {
253256
case affinity == nil:
254-
// If no clusters specified, add it in queue
257+
// If no clusters specified, add it to the queue
255258
fallthrough
256259
case util.ClusterMatches(newCluster, *affinity):
257-
// If specific cluster matches the affinity. add it in queue
260+
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
261+
fallthrough
262+
case util.ClusterMatches(oldCluster, *affinity):
263+
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
258264
err := s.requeueClusterResourceBindings(selector)
259265
if err != nil {
260266
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)

pkg/scheduler/framework/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
package framework
22

33
import (
4+
"errors"
5+
46
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
57
)
68

9+
// ErrNoClusterFit is returned when no cluster fit the scheduling requirements.
10+
var ErrNoClusterFit = errors.New("no cluster fit")
11+
712
// ClusterInfo is cluster level aggregated information.
813
type ClusterInfo struct {
914
// Overall cluster information.

pkg/scheduler/scheduler.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scheduler
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"reflect"
89
"time"
@@ -13,6 +14,7 @@ import (
1314
"k8s.io/apimachinery/pkg/api/meta"
1415
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1516
"k8s.io/apimachinery/pkg/types"
17+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1618
"k8s.io/apimachinery/pkg/util/wait"
1719
"k8s.io/client-go/dynamic"
1820
"k8s.io/client-go/kubernetes"
@@ -32,6 +34,7 @@ import (
3234
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
3335
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
3436
"github.com/karmada-io/karmada/pkg/scheduler/core"
37+
"github.com/karmada-io/karmada/pkg/scheduler/framework"
3538
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
3639
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
3740
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
@@ -464,13 +467,15 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
464467
}
465468

466469
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
467-
if err != nil {
470+
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
471+
if err != nil && !errors.Is(err, framework.ErrNoClusterFit) {
468472
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
469473
return err
470474
}
471475

472476
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
473-
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
477+
scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
478+
return utilerrors.NewAggregate([]error{err, scheduleErr})
474479
}
475480

476481
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
@@ -515,13 +520,15 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
515520
}
516521

517522
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
518-
if err != nil {
523+
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
524+
if err != nil && !errors.Is(err, framework.ErrNoClusterFit) {
519525
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
520526
return err
521527
}
522528

523529
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
524-
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
530+
scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
531+
return utilerrors.NewAggregate([]error{err, scheduleErr})
525532
}
526533

527534
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {

test/e2e/framework/cluster.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,50 @@ func setClusterLabel(c client.Client, clusterName string) error {
229229
return err
230230
}
231231

232+
// UpdateClusterLabels updates cluster labels.
233+
func UpdateClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
234+
gomega.Eventually(func() (bool, error) {
235+
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
236+
if err != nil {
237+
return false, err
238+
}
239+
240+
if cluster.Labels == nil {
241+
cluster.Labels = map[string]string{}
242+
}
243+
for key, value := range labels {
244+
cluster.Labels[key] = value
245+
}
246+
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
247+
if err != nil {
248+
return false, err
249+
}
250+
return true, nil
251+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
252+
}
253+
254+
// DeleteClusterLabels deletes cluster labels if it exists.
255+
func DeleteClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
256+
gomega.Eventually(func() (bool, error) {
257+
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
258+
if err != nil {
259+
return false, err
260+
}
261+
262+
if cluster.Labels == nil {
263+
return true, nil
264+
}
265+
for key := range labels {
266+
delete(cluster.Labels, key)
267+
}
268+
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
269+
if err != nil {
270+
return false, err
271+
}
272+
return true, nil
273+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
274+
}
275+
232276
// GetClusterNamesFromClusters will get Clusters' names form Clusters Object.
233277
func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string {
234278
clusterNames := make([]string, 0, len(clusters))

test/e2e/rescheduling_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,3 +321,112 @@ var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() {
321321
})
322322
})
323323
})
324+
325+
// reschedule testing while policy matches, triggered by label changes.
326+
var _ = ginkgo.Describe("[cluster labels changed] reschedule testing while policy matches", func() {
327+
var deployment *appsv1.Deployment
328+
var targetMember string
329+
var labelKey string
330+
var policyNamespace string
331+
var policyName string
332+
333+
ginkgo.BeforeEach(func() {
334+
targetMember = framework.ClusterNames()[0]
335+
policyNamespace = testNamespace
336+
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
337+
labelKey = "cluster" + rand.String(RandomStrLength)
338+
339+
deployment = testhelper.NewDeployment(testNamespace, policyName)
340+
framework.CreateDeployment(kubeClient, deployment)
341+
342+
labels := map[string]string{labelKey: "ok"}
343+
framework.UpdateClusterLabels(karmadaClient, targetMember, labels)
344+
345+
ginkgo.DeferCleanup(func() {
346+
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
347+
framework.DeleteClusterLabels(karmadaClient, targetMember, labels)
348+
})
349+
})
350+
351+
ginkgo.Context("Changes cluster labels to test reschedule while pp matches", func() {
352+
var policy *policyv1alpha1.PropagationPolicy
353+
354+
ginkgo.BeforeEach(func() {
355+
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
356+
{
357+
APIVersion: deployment.APIVersion,
358+
Kind: deployment.Kind,
359+
Name: deployment.Name,
360+
}}, policyv1alpha1.Placement{
361+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
362+
LabelSelector: &metav1.LabelSelector{
363+
MatchLabels: map[string]string{labelKey: "ok"},
364+
},
365+
},
366+
})
367+
})
368+
369+
ginkgo.BeforeEach(func() {
370+
framework.CreatePropagationPolicy(karmadaClient, policy)
371+
372+
ginkgo.DeferCleanup(func() {
373+
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
374+
})
375+
376+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
377+
func(deployment *appsv1.Deployment) bool { return true })
378+
})
379+
380+
ginkgo.It("change labels to testing deployment reschedule", func() {
381+
labelsUpdate := map[string]string{labelKey: "not_ok"}
382+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
383+
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
384+
385+
labelsUpdate = map[string]string{labelKey: "ok"}
386+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
387+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
388+
func(deployment *appsv1.Deployment) bool { return true })
389+
})
390+
})
391+
392+
ginkgo.Context("Changes cluster labels to test reschedule while cpp matches", func() {
393+
var policy *policyv1alpha1.ClusterPropagationPolicy
394+
395+
ginkgo.BeforeEach(func() {
396+
policy = testhelper.NewClusterPropagationPolicy(policyName, []policyv1alpha1.ResourceSelector{
397+
{
398+
APIVersion: deployment.APIVersion,
399+
Kind: deployment.Kind,
400+
Name: deployment.Name,
401+
}}, policyv1alpha1.Placement{
402+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
403+
LabelSelector: &metav1.LabelSelector{
404+
MatchLabels: map[string]string{labelKey: "ok"},
405+
},
406+
},
407+
})
408+
})
409+
410+
ginkgo.BeforeEach(func() {
411+
framework.CreateClusterPropagationPolicy(karmadaClient, policy)
412+
413+
ginkgo.DeferCleanup(func() {
414+
framework.RemoveClusterPropagationPolicy(karmadaClient, policy.Name)
415+
})
416+
417+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
418+
func(deployment *appsv1.Deployment) bool { return true })
419+
})
420+
421+
ginkgo.It("change labels to testing deployment reschedule", func() {
422+
labelsUpdate := map[string]string{labelKey: "not_ok"}
423+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
424+
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
425+
426+
labelsUpdate = map[string]string{labelKey: "ok"}
427+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
428+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
429+
func(deployment *appsv1.Deployment) bool { return true })
430+
})
431+
})
432+
})

0 commit comments

Comments
 (0)