Skip to content

Commit

Permalink
Merge pull request #231 from Poor12/improve-scheduling-event
Browse files Browse the repository at this point in the history
  • Loading branch information
gary-lgy authored Sep 25, 2023
2 parents 17fbfdd + aa40b7b commit f6617c8
Show file tree
Hide file tree
Showing 11 changed files with 115 additions and 12 deletions.
18 changes: 14 additions & 4 deletions pkg/controllers/scheduler/core/generic_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,17 @@ func (g *genericScheduler) Schedule(
return result, nil
}

feasibleClusters, err := g.findClustersThatFitWorkload(ctx, fwk, schedulingUnit, clusters)
feasibleClusters, diagnosis, err := g.findClustersThatFitWorkload(ctx, fwk, schedulingUnit, clusters)
if err != nil {
return result, fmt.Errorf("failed to findClustersThatFitWorkload: %w", err)
}
logger.V(2).
Info("Clusters filtered", "result", spew.Sprint(feasibleClusters))
if len(feasibleClusters) == 0 {
return result, nil
return result, &framework.FitError{
NumAllClusters: len(clusters),
Diagnosis: diagnosis,
}
}

clusterScores, err := g.scoreClusters(ctx, fwk, schedulingUnit, feasibleClusters)
Expand Down Expand Up @@ -155,18 +158,25 @@ func (g *genericScheduler) findClustersThatFitWorkload(
fwk framework.Framework,
schedulingUnit framework.SchedulingUnit,
clusters []*fedcorev1a1.FederatedCluster,
) ([]*fedcorev1a1.FederatedCluster, error) {
) ([]*fedcorev1a1.FederatedCluster, framework.Diagnosis, error) {
logger := klog.FromContext(ctx)

diagnosis := framework.Diagnosis{
ClusterToResultMap: make(framework.ClusterToResultMap),
UnschedulablePlugins: sets.New[string](),
}

ret := make([]*fedcorev1a1.FederatedCluster, 0)
for _, cluster := range clusters {
if result := fwk.RunFilterPlugins(ctx, &schedulingUnit, cluster); !result.IsSuccess() {
logger.V(2).Info("Cluster doesn't fit", "name", cluster.Name, "reason", result.AsError())
diagnosis.ClusterToResultMap[cluster.Name] = result
diagnosis.UnschedulablePlugins.Insert(result.FailedPlugin())
} else {
ret = append(ret, cluster)
}
}
return ret, nil
return ret, diagnosis, nil
}

func (g *genericScheduler) scoreClusters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (p *WebhookPlugin) Filter(
if resp.Selected {
return framework.NewResult(framework.Success)
} else {
return framework.NewResult(framework.Unschedulable)
return framework.NewResult(framework.Unschedulable, fmt.Sprintf("cluster(s) were filtered by webhookPlugin(%s)", p.name))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func TestFilter(t *testing.T) {
selected: true,
},
"webhook does not select cluster": {
webhookErrors: webhookErrors{
responseError: "cluster(s) were filtered by webhookPlugin(test)",
},
su: getSampleSchedulingUnit(),
cluster: getSampleCluster("test"),
selected: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ func (pl *APIResources) Filter(ctx context.Context, su *framework.SchedulingUnit
return framework.NewResult(framework.Success)
}
}
return framework.NewResult(framework.Unschedulable, "No matched group version kind.")
return framework.NewResult(framework.Unschedulable, "cluster(s) didn't support this APIVersion")
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestAPIResourcesFilter(t *testing.T) {
Kind: "Deployment",
},
}),
wantResult: framework.NewResult(framework.Unschedulable, "No matched group version kind."),
wantResult: framework.NewResult(framework.Unschedulable, "cluster(s) didn't support this APIVersion"),
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (pl *ClusterReady) Filter(
// Prevent scheduling to unready cluster unless it is already scheduled to.
_, alreadyScheduled := su.CurrentClusters[cluster.Name]
if !alreadyScheduled && !clusterutil.IsClusterReady(&cluster.Status) {
return framework.NewResult(framework.Unschedulable, "cluster is unready")
return framework.NewResult(framework.Unschedulable, "cluster(s) were unready")
}

return framework.NewResult(framework.Success)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (pl *PlacementFilter) Filter(
}

if _, exists := su.ClusterNames[cluster.Name]; !exists {
return framework.NewResult(framework.Unschedulable, "cluster is not in placement list")
return framework.NewResult(framework.Unschedulable, "cluster(s) were not in placement list")
}

return framework.NewResult(framework.Success)
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/scheduler/framework/runtime/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (f *frameworkImpl) RunFilterPlugins(
for _, pl := range f.filterPlugins {
pluginResult := f.runFilterPlugin(ctx, pl, schedulingUnit, cluster)
if !pluginResult.IsSuccess() {
pluginResult.SetFailedPlugin(pl.Name())
return pluginResult
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/scheduler/framework/runtime/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestRunFilterPlugins(t *testing.T) {
"a": getNaiveFilterPluginFactory(false),
},
&fedcore.EnabledPlugins{FilterPlugins: []string{"a"}},
framework.NewResult(framework.Error),
framework.NewResult(framework.Error).WithFailedPlugin("NaiveFilterPlugin"),
},
{
"multiple filter plugins, all succeed",
Expand All @@ -182,7 +182,7 @@ func TestRunFilterPlugins(t *testing.T) {
"c": getNaiveFilterPluginFactory(true),
},
&fedcore.EnabledPlugins{FilterPlugins: []string{"a", "b", "c"}},
framework.NewResult(framework.Error),
framework.NewResult(framework.Error).WithFailedPlugin("NaiveFilterPlugin"),
},
{
"multiple filter plugins, none succeed",
Expand All @@ -192,7 +192,7 @@ func TestRunFilterPlugins(t *testing.T) {
"c": getNaiveFilterPluginFactory(false),
},
&fedcore.EnabledPlugins{FilterPlugins: []string{"a", "b", "c"}},
framework.NewResult(framework.Error),
framework.NewResult(framework.Error).WithFailedPlugin("NaiveFilterPlugin"),
},
}

Expand Down
77 changes: 77 additions & 0 deletions pkg/controllers/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ package framework

import (
"errors"
"fmt"
"sort"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"

fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1"
)
Expand Down Expand Up @@ -150,6 +153,8 @@ type Result struct {
code Code
reasons []string
err error

failedPlugin string
}

// Code is the Status code/type which is returned from plugins.
Expand Down Expand Up @@ -194,6 +199,23 @@ func (s *Result) Code() Code {
return s.code
}

// SetFailedPlugin sets the given plugin name to s.failedPlugin.
func (s *Result) SetFailedPlugin(plugin string) {
s.failedPlugin = plugin
}

// WithFailedPlugin sets the given plugin name to s.failedPlugin,
// and returns the given result object.
func (s *Result) WithFailedPlugin(plugin string) *Result {
s.SetFailedPlugin(plugin)
return s
}

// FailedPlugin returns the failed plugin name.
func (s *Result) FailedPlugin() string {
return s.failedPlugin
}

// IsSuccess returns true if and only if "Result" is nil or Code is "Success".
func (s *Result) IsSuccess() bool {
return s == nil || s.code == Success
Expand All @@ -218,3 +240,58 @@ func (s *Result) AsError() error {
}
return errors.New(strings.Join(s.reasons, ", "))
}

// Reasons returns reasons of the Result.
func (s *Result) Reasons() []string {
if s.err != nil {
return append([]string{s.err.Error()}, s.reasons...)
}
return s.reasons
}

// FitError describes a fit error of a cluster.
type FitError struct {
NumAllClusters int
Diagnosis Diagnosis
}

// ClusterToResultMap declares map from cluster name to its result.
type ClusterToResultMap map[string]*Result

// Diagnosis records the details to diagnose a scheduling failure.
type Diagnosis struct {
ClusterToResultMap ClusterToResultMap
UnschedulablePlugins sets.Set[string]
}

const (
// NoClusterAvailableMsg is used to format message when no clusters available.
NoClusterAvailableMsg = "0/%v clusters are available"
)

// Error returns detailed information of why the pod failed to fit on each node.
// A message format is "0/X clusters are available: <FilterMsg>. "
func (f *FitError) Error() string {
reasons := make(map[string]int)
for _, result := range f.Diagnosis.ClusterToResultMap {
for _, reason := range result.Reasons() {
reasons[reason]++
}
}

reasonMsg := fmt.Sprintf(NoClusterAvailableMsg+": ", f.NumAllClusters)
sortReasonsHistogram := func() []string {
var reasonStrings []string
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
sortedFilterMsg := sortReasonsHistogram()
if len(sortedFilterMsg) != 0 {
reasonMsg += strings.Join(sortedFilterMsg, ", ")
}

return reasonMsg
}
12 changes: 12 additions & 0 deletions pkg/controllers/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package scheduler

import (
"context"
"errors"
"fmt"
"reflect"
"sync"
Expand All @@ -44,6 +45,7 @@ import (
fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1"
"github.com/kubewharf/kubeadmiral/pkg/controllers/common"
"github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/core"
frameworktypes "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework"
"github.com/kubewharf/kubeadmiral/pkg/stats"
utilmetrics "github.com/kubewharf/kubeadmiral/pkg/stats/metrics"
clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster"
Expand Down Expand Up @@ -561,8 +563,18 @@ func (s *Scheduler) schedule(
}

ctx = klog.NewContext(ctx, logger)
var fitErr *frameworktypes.FitError
result, err := s.algorithm.Schedule(ctx, framework, *schedulingUnit, clusters)
if err != nil {
if errors.As(err, &fitErr) {
logger.Error(err, "No available clusters")
s.eventRecorder.Eventf(
fedObject,
corev1.EventTypeWarning,
EventReasonScheduleFederatedObject,
err.Error())
return nil, &worker.StatusError
}
logger.Error(err, "Failed to compute scheduling result")
s.eventRecorder.Eventf(
fedObject,
Expand Down

0 comments on commit f6617c8

Please sign in to comment.