Skip to content

Commit 4c56b19

Browse files
authored
Merge pull request #2955 from jwcesign/automated-cherry-pick-of-#2912-upstream-release-1.4
Automated cherry pick of #2912: fix a corner case that re-schedule be skipped in case of the cluster becomes not fit
2 parents 66305f8 + 34d83c2 commit 4c56b19

File tree

5 files changed

+181
-12
lines changed

5 files changed

+181
-12
lines changed

pkg/scheduler/core/generic_scheduler.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, placement *policyv1alph
6363
return result, fmt.Errorf("failed to findClustersThatFit: %v", err)
6464
}
6565

66+
// Short path for case no cluster fit.
6667
if len(feasibleClusters) == 0 {
6768
return result, &framework.FitError{
6869
NumAllClusters: clusterInfoSnapshot.NumOfClusters(),

pkg/scheduler/event_handler.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,13 @@ func (s *Scheduler) updateCluster(oldObj, newObj interface{}) {
229229
case !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels):
230230
fallthrough
231231
case !equality.Semantic.DeepEqual(oldCluster.Spec, newCluster.Spec):
232-
s.enqueueAffectedPolicy(newCluster)
233-
s.enqueueAffectedClusterPolicy(newCluster)
232+
s.enqueueAffectedPolicy(oldCluster, newCluster)
233+
s.enqueueAffectedClusterPolicy(oldCluster, newCluster)
234234
}
235235
}
236236

237237
// enqueueAffectedPolicy find all propagation policies related to the cluster and reschedule the RBs
238-
func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
238+
func (s *Scheduler) enqueueAffectedPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
239239
policies, _ := s.policyLister.List(labels.Everything())
240240
for _, policy := range policies {
241241
selector := labels.SelectorFromSet(labels.Set{
@@ -245,10 +245,13 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
245245
affinity := policy.Spec.Placement.ClusterAffinity
246246
switch {
247247
case affinity == nil:
248-
// If no clusters specified, add it in queue
248+
// If no clusters specified, add it to the queue
249249
fallthrough
250250
case util.ClusterMatches(newCluster, *affinity):
251-
// If specific cluster matches the affinity. add it in queue
251+
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
252+
fallthrough
253+
case util.ClusterMatches(oldCluster, *affinity):
254+
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
252255
err := s.requeueResourceBindings(selector, metrics.ClusterChanged)
253256
if err != nil {
254257
klog.Errorf("Failed to requeue ResourceBinding, error: %v", err)
@@ -258,7 +261,7 @@ func (s *Scheduler) enqueueAffectedPolicy(newCluster *clusterv1alpha1.Cluster) {
258261
}
259262

260263
// enqueueAffectedClusterPolicy find all cluster propagation policies related to the cluster and reschedule the RBs/CRBs
261-
func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Cluster) {
264+
func (s *Scheduler) enqueueAffectedClusterPolicy(oldCluster, newCluster *clusterv1alpha1.Cluster) {
262265
clusterPolicies, _ := s.clusterPolicyLister.List(labels.Everything())
263266
for _, policy := range clusterPolicies {
264267
selector := labels.SelectorFromSet(labels.Set{
@@ -267,10 +270,13 @@ func (s *Scheduler) enqueueAffectedClusterPolicy(newCluster *clusterv1alpha1.Clu
267270
affinity := policy.Spec.Placement.ClusterAffinity
268271
switch {
269272
case affinity == nil:
270-
// If no clusters specified, add it in queue
273+
// If no clusters specified, add it to the queue
271274
fallthrough
272275
case util.ClusterMatches(newCluster, *affinity):
273-
// If specific cluster matches the affinity. add it in queue
276+
// If the new cluster manifest match the affinity, add it to the queue, trigger rescheduling
277+
fallthrough
278+
case util.ClusterMatches(oldCluster, *affinity):
279+
// If the old cluster manifest match the affinity, add it to the queue, trigger rescheduling
274280
err := s.requeueClusterResourceBindings(selector, metrics.ClusterChanged)
275281
if err != nil {
276282
klog.Errorf("Failed to requeue ClusterResourceBinding, error: %v", err)

pkg/scheduler/scheduler.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package scheduler
33
import (
44
"context"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"reflect"
89
"time"
@@ -13,6 +14,7 @@ import (
1314
"k8s.io/apimachinery/pkg/api/meta"
1415
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1516
"k8s.io/apimachinery/pkg/types"
17+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1618
"k8s.io/apimachinery/pkg/util/wait"
1719
"k8s.io/client-go/dynamic"
1820
"k8s.io/client-go/kubernetes"
@@ -33,6 +35,7 @@ import (
3335
worklister "github.com/karmada-io/karmada/pkg/generated/listers/work/v1alpha2"
3436
schedulercache "github.com/karmada-io/karmada/pkg/scheduler/cache"
3537
"github.com/karmada-io/karmada/pkg/scheduler/core"
38+
"github.com/karmada-io/karmada/pkg/scheduler/framework"
3639
frameworkplugins "github.com/karmada-io/karmada/pkg/scheduler/framework/plugins"
3740
"github.com/karmada-io/karmada/pkg/scheduler/framework/runtime"
3841
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
@@ -476,13 +479,16 @@ func (s *Scheduler) scheduleResourceBinding(resourceBinding *workv1alpha2.Resour
476479
}
477480

478481
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &placement, &resourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
479-
if err != nil {
482+
var noClusterFit *framework.FitError
483+
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
484+
if err != nil && !errors.As(err, &noClusterFit) {
480485
klog.Errorf("Failed scheduling ResourceBinding %s/%s: %v", resourceBinding.Namespace, resourceBinding.Name, err)
481486
return err
482487
}
483488

484489
klog.V(4).Infof("ResourceBinding %s/%s scheduled to clusters %v", resourceBinding.Namespace, resourceBinding.Name, scheduleResult.SuggestedClusters)
485-
return s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
490+
scheduleErr := s.patchScheduleResultForResourceBinding(resourceBinding, placementStr, scheduleResult.SuggestedClusters)
491+
return utilerrors.NewAggregate([]error{err, scheduleErr})
486492
}
487493

488494
func (s *Scheduler) patchScheduleResultForResourceBinding(oldBinding *workv1alpha2.ResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {
@@ -527,13 +533,16 @@ func (s *Scheduler) scheduleClusterResourceBinding(clusterResourceBinding *workv
527533
}
528534

529535
scheduleResult, err := s.Algorithm.Schedule(context.TODO(), &policy.Spec.Placement, &clusterResourceBinding.Spec, &core.ScheduleAlgorithmOption{EnableEmptyWorkloadPropagation: s.enableEmptyWorkloadPropagation})
530-
if err != nil {
536+
var noClusterFit *framework.FitError
537+
// in case of no cluster fit, can not return but continue to patch(cleanup) the result.
538+
if err != nil && !errors.As(err, &noClusterFit) {
531539
klog.V(2).Infof("Failed scheduling ClusterResourceBinding %s: %v", clusterResourceBinding.Name, err)
532540
return err
533541
}
534542

535543
klog.V(4).Infof("ClusterResourceBinding %s scheduled to clusters %v", clusterResourceBinding.Name, scheduleResult.SuggestedClusters)
536-
return s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
544+
scheduleErr := s.patchScheduleResultForClusterResourceBinding(clusterResourceBinding, string(placement), scheduleResult.SuggestedClusters)
545+
return utilerrors.NewAggregate([]error{err, scheduleErr})
537546
}
538547

539548
func (s *Scheduler) patchScheduleResultForClusterResourceBinding(oldBinding *workv1alpha2.ClusterResourceBinding, placement string, scheduleResult []workv1alpha2.TargetCluster) error {

test/e2e/framework/cluster.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,50 @@ func setClusterLabel(c client.Client, clusterName string) error {
230230
return err
231231
}
232232

233+
// UpdateClusterLabels updates cluster labels.
234+
func UpdateClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
235+
gomega.Eventually(func() (bool, error) {
236+
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
237+
if err != nil {
238+
return false, err
239+
}
240+
241+
if cluster.Labels == nil {
242+
cluster.Labels = map[string]string{}
243+
}
244+
for key, value := range labels {
245+
cluster.Labels[key] = value
246+
}
247+
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
248+
if err != nil {
249+
return false, err
250+
}
251+
return true, nil
252+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
253+
}
254+
255+
// DeleteClusterLabels deletes cluster labels if it exists.
256+
func DeleteClusterLabels(client karmada.Interface, clusterName string, labels map[string]string) {
257+
gomega.Eventually(func() (bool, error) {
258+
cluster, err := client.ClusterV1alpha1().Clusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
259+
if err != nil {
260+
return false, err
261+
}
262+
263+
if cluster.Labels == nil {
264+
return true, nil
265+
}
266+
for key := range labels {
267+
delete(cluster.Labels, key)
268+
}
269+
_, err = client.ClusterV1alpha1().Clusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
270+
if err != nil {
271+
return false, err
272+
}
273+
return true, nil
274+
}, pollTimeout, pollInterval).Should(gomega.Equal(true))
275+
}
276+
233277
// GetClusterNamesFromClusters will get Clusters' names form Clusters Object.
234278
func GetClusterNamesFromClusters(clusters []*clusterv1alpha1.Cluster) []string {
235279
clusterNames := make([]string, 0, len(clusters))

test/e2e/rescheduling_test.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,112 @@ var _ = ginkgo.Describe("[cluster joined] reschedule testing", func() {
319319
})
320320
})
321321
})
322+
323+
// reschedule testing while policy matches, triggered by label changes.
324+
var _ = ginkgo.Describe("[cluster labels changed] reschedule testing while policy matches", func() {
325+
var deployment *appsv1.Deployment
326+
var targetMember string
327+
var labelKey string
328+
var policyNamespace string
329+
var policyName string
330+
331+
ginkgo.BeforeEach(func() {
332+
targetMember = framework.ClusterNames()[0]
333+
policyNamespace = testNamespace
334+
policyName = deploymentNamePrefix + rand.String(RandomStrLength)
335+
labelKey = "cluster" + rand.String(RandomStrLength)
336+
337+
deployment = testhelper.NewDeployment(testNamespace, policyName)
338+
framework.CreateDeployment(kubeClient, deployment)
339+
340+
labels := map[string]string{labelKey: "ok"}
341+
framework.UpdateClusterLabels(karmadaClient, targetMember, labels)
342+
343+
ginkgo.DeferCleanup(func() {
344+
framework.RemoveDeployment(kubeClient, deployment.Namespace, deployment.Name)
345+
framework.DeleteClusterLabels(karmadaClient, targetMember, labels)
346+
})
347+
})
348+
349+
ginkgo.Context("Changes cluster labels to test reschedule while pp matches", func() {
350+
var policy *policyv1alpha1.PropagationPolicy
351+
352+
ginkgo.BeforeEach(func() {
353+
policy = testhelper.NewPropagationPolicy(policyNamespace, policyName, []policyv1alpha1.ResourceSelector{
354+
{
355+
APIVersion: deployment.APIVersion,
356+
Kind: deployment.Kind,
357+
Name: deployment.Name,
358+
}}, policyv1alpha1.Placement{
359+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
360+
LabelSelector: &metav1.LabelSelector{
361+
MatchLabels: map[string]string{labelKey: "ok"},
362+
},
363+
},
364+
})
365+
})
366+
367+
ginkgo.BeforeEach(func() {
368+
framework.CreatePropagationPolicy(karmadaClient, policy)
369+
370+
ginkgo.DeferCleanup(func() {
371+
framework.RemovePropagationPolicy(karmadaClient, policy.Namespace, policy.Name)
372+
})
373+
374+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
375+
func(deployment *appsv1.Deployment) bool { return true })
376+
})
377+
378+
ginkgo.It("change labels to testing deployment reschedule", func() {
379+
labelsUpdate := map[string]string{labelKey: "not_ok"}
380+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
381+
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
382+
383+
labelsUpdate = map[string]string{labelKey: "ok"}
384+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
385+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
386+
func(deployment *appsv1.Deployment) bool { return true })
387+
})
388+
})
389+
390+
ginkgo.Context("Changes cluster labels to test reschedule while cpp matches", func() {
391+
var policy *policyv1alpha1.ClusterPropagationPolicy
392+
393+
ginkgo.BeforeEach(func() {
394+
policy = testhelper.NewClusterPropagationPolicy(policyName, []policyv1alpha1.ResourceSelector{
395+
{
396+
APIVersion: deployment.APIVersion,
397+
Kind: deployment.Kind,
398+
Name: deployment.Name,
399+
}}, policyv1alpha1.Placement{
400+
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
401+
LabelSelector: &metav1.LabelSelector{
402+
MatchLabels: map[string]string{labelKey: "ok"},
403+
},
404+
},
405+
})
406+
})
407+
408+
ginkgo.BeforeEach(func() {
409+
framework.CreateClusterPropagationPolicy(karmadaClient, policy)
410+
411+
ginkgo.DeferCleanup(func() {
412+
framework.RemoveClusterPropagationPolicy(karmadaClient, policy.Name)
413+
})
414+
415+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
416+
func(deployment *appsv1.Deployment) bool { return true })
417+
})
418+
419+
ginkgo.It("change labels to testing deployment reschedule", func() {
420+
labelsUpdate := map[string]string{labelKey: "not_ok"}
421+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
422+
framework.WaitDeploymentDisappearOnCluster(targetMember, deployment.Namespace, deployment.Name)
423+
424+
labelsUpdate = map[string]string{labelKey: "ok"}
425+
framework.UpdateClusterLabels(karmadaClient, targetMember, labelsUpdate)
426+
framework.WaitDeploymentPresentOnClusterFitWith(targetMember, deployment.Namespace, deployment.Name,
427+
func(deployment *appsv1.Deployment) bool { return true })
428+
})
429+
})
430+
})

0 commit comments

Comments
 (0)