Skip to content

Commit

Permalink
Merge pull request #76 from zalando-incubator/use-informers
Browse files Browse the repository at this point in the history
Use pod and node informers to reduce APIServer load
  • Loading branch information
otrosien authored Dec 4, 2024
2 parents 889bf12 + 7cd4b49 commit ac84610
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 39 deletions.
60 changes: 57 additions & 3 deletions operator/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
kube_record "k8s.io/client-go/tools/record"
)
Expand All @@ -37,6 +40,8 @@ const (
type ElasticsearchOperator struct {
logger *log.Entry
kube *clientset.Clientset
podInformer informersv1.PodInformer
nodeInformer informersv1.NodeInformer
interval time.Duration
autoscalerInterval time.Duration
metricsInterval time.Duration
Expand Down Expand Up @@ -74,6 +79,7 @@ func NewElasticsearchOperator(
clusterDNSZone string,
elasticsearchEndpoint *url.URL,
) *ElasticsearchOperator {

return &ElasticsearchOperator{
logger: log.WithFields(
log.Fields{
Expand All @@ -94,13 +100,58 @@ func NewElasticsearchOperator(
}
}

// setup informers for pods and nodes.
func (o *ElasticsearchOperator) setupInformers(ctx context.Context) error {
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(o.kube, 0, kubeinformers.WithNamespace(o.namespace))
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()

informers := []cache.SharedIndexInformer{
podInformer.Informer(),
nodeInformer.Informer(),
}

for _, informer := range informers {
// Add default resource event handlers to properly initialize informer.
_, err := informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {},
},
)
if err != nil {
return fmt.Errorf("failed to add event handler: %v", err)
}
}

informerFactory.Start(ctx.Done())

for _, informer := range informers {
// wait for the local cache to be populated.
err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(_ context.Context) (bool, error) {
return informer.HasSynced() == true, nil
})
if err != nil {
return fmt.Errorf("failed to sync cache for informer: %v", err)
}
}

o.podInformer = podInformer
o.nodeInformer = nodeInformer
return nil
}

// Run runs the main loop of the operator.
func (o *ElasticsearchOperator) Run(ctx context.Context) error {
err := o.setupInformers(ctx)
if err != nil {
return err
}

go o.collectMetrics(ctx)
go o.runAutoscaler(ctx)

// run EDS watcher
err := o.runWatch(ctx)
err = o.runWatch(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -689,6 +740,8 @@ func (o *ElasticsearchOperator) operateEDS(eds *zv1.ElasticsearchDataSet, delete

operator := &Operator{
kube: o.kube,
podInformer: o.podInformer,
nodeInformer: o.nodeInformer,
priorityNodeSelectors: o.priorityNodeSelectors,
interval: o.interval,
logger: logger,
Expand Down Expand Up @@ -832,12 +885,13 @@ func (o *ElasticsearchOperator) collectResources(ctx context.Context) (map[types
}

// TODO: label filter
pods, err := o.kube.CoreV1().Pods(o.namespace).List(ctx, metav1.ListOptions{})
pods, err := o.podInformer.Lister().Pods(o.namespace).List(labels.Everything())
if err != nil {
return nil, err
}

for _, pod := range pods.Items {
for _, pod := range pods {
pod := *pod
for _, es := range resources {
es := es
// TODO: leaky abstraction
Expand Down
53 changes: 25 additions & 28 deletions operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
kube_record "k8s.io/client-go/tools/record"
)
Expand Down Expand Up @@ -96,6 +97,8 @@ type StatefulResource interface {
// Operator is a generic operator that can manage Pods filtered by a selector.
type Operator struct {
kube *clientset.Clientset
podInformer informersv1.PodInformer
nodeInformer informersv1.NodeInformer
priorityNodeSelectors labels.Set
interval time.Duration
logger *log.Entry
Expand Down Expand Up @@ -307,18 +310,14 @@ func (o *Operator) operatePods(ctx context.Context, sts *appsv1.StatefulSet, srg
return sr.OnStableReplicasHook(ctx)
}

opts := metav1.ListOptions{
LabelSelector: labels.Set(
sr.LabelSelector(),
).AsSelector().String(),
}
labelSelector := labels.Set(sr.LabelSelector()).AsSelector()

pods, err := o.kube.CoreV1().Pods(sr.Namespace()).List(ctx, opts)
pods, err := o.podInformer.Lister().Pods(sr.Namespace()).List(labelSelector)
if err != nil {
return fmt.Errorf("failed to list pods of StatefulSet: %v", err)
}

pod, err := o.getPodToUpdate(ctx, pods.Items, sts, sr)
pod, err := o.getPodToUpdate(ctx, pods, sts, sr)
if err != nil {
return fmt.Errorf("failed to get Pod to update: %v", err)
}
Expand Down Expand Up @@ -439,12 +438,10 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS
replicas--
}

opts := metav1.ListOptions{
LabelSelector: labels.Set(sts.Spec.Selector.MatchLabels).String(),
}
labelSelector := labels.Set(sts.Spec.Selector.MatchLabels).AsSelector()

// get all Pods of the StatefulSet
pods, err := o.kube.CoreV1().Pods(sts.Namespace).List(ctx, opts)
pods, err := o.podInformer.Lister().Pods(sts.Namespace).List(labelSelector)
if err != nil {
return err
}
Expand All @@ -453,14 +450,14 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS
// We use this property to sort Pods by the lowest ordinal number and
// drain those that would be scaled down by Kubernetes when reducing
// the replica count on the StatefulSet.
pods.Items, err = sortStatefulSetPods(pods.Items)
pods, err = sortStatefulSetPods(pods)
if err != nil {
return err
}

if len(pods.Items) > replicas {
log.Infof("Starting pod draining from %d to %d pods", len(pods.Items), replicas)
for _, pod := range pods.Items[replicas:] {
if len(pods) > replicas {
log.Infof("Starting pod draining from %d to %d pods", len(pods), replicas)
for _, pod := range pods[replicas:] {
// first, check if we need to opt-out of the loop because the EDS changed.
newSR, err := srg.Get(ctx)
if err != nil {
Expand All @@ -485,7 +482,7 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS
}

log.Infof("Draining Pod %s/%s for scaledown", pod.Namespace, pod.Name)
err = newSR.Drain(ctx, &pod)
err = newSR.Drain(ctx, pod)
if err != nil {
return fmt.Errorf("failed to drain pod %s/%s: %v", pod.Namespace, pod.Name, err)
}
Expand Down Expand Up @@ -520,10 +517,10 @@ func (o *Operator) rescaleStatefulSet(ctx context.Context, sts *appsv1.StatefulS

// sortStatefulSetPods sorts pods based on their ordinal numbers which is the
// last part of the pod name.
func sortStatefulSetPods(pods []v1.Pod) ([]v1.Pod, error) {
func sortStatefulSetPods(pods []*v1.Pod) ([]*v1.Pod, error) {
type ordinalPod struct {
Number int
Pod v1.Pod
Pod *v1.Pod
}

ordinalNumbers := make([]ordinalPod, len(pods))
Expand All @@ -540,7 +537,7 @@ func sortStatefulSetPods(pods []v1.Pod) ([]v1.Pod, error) {
return ordinalNumbers[i].Number < ordinalNumbers[j].Number
})

sortedPods := make([]v1.Pod, len(pods))
sortedPods := make([]*v1.Pod, len(pods))
for i, ordinal := range ordinalNumbers {
sortedPods[i] = ordinal.Pod
}
Expand All @@ -550,7 +547,7 @@ func sortStatefulSetPods(pods []v1.Pod) ([]v1.Pod, error) {

// getPodToUpdate gets a single Pod to update based on priority.
// if no update is needed it returns nil.
func (o *Operator) getPodToUpdate(ctx context.Context, pods []v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource) (*v1.Pod, error) {
func (o *Operator) getPodToUpdate(ctx context.Context, pods []*v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource) (*v1.Pod, error) {
// return early if there are no Pods to manage
if len(pods) == 0 {
return nil, nil
Expand All @@ -577,16 +574,16 @@ func (o *Operator) getPodToUpdate(ctx context.Context, pods []v1.Pod, sts *appsv

// getNodes gets all nodes matching the priority node selector and all nodes
// that are marked unschedulable.
func (o *Operator) getNodes(ctx context.Context) (map[string]v1.Node, map[string]v1.Node, error) {
opts := metav1.ListOptions{}
nodes, err := o.kube.CoreV1().Nodes().List(ctx, opts)
func (o *Operator) getNodes(_ context.Context) (map[string]v1.Node, map[string]v1.Node, error) {
nodes, err := o.nodeInformer.Lister().List(labels.Everything())
if err != nil {
return nil, nil, err
}

priorityNodesMap := make(map[string]v1.Node, len(nodes.Items))
unschedulableNodesMap := make(map[string]v1.Node, len(nodes.Items))
for _, node := range nodes.Items {
priorityNodesMap := make(map[string]v1.Node, len(nodes))
unschedulableNodesMap := make(map[string]v1.Node, len(nodes))
for _, node := range nodes {
node := *node
if len(node.Labels) > 0 && isSubset(o.priorityNodeSelectors, labels.Set(node.Labels)) {
priorityNodesMap[node.Name] = node
}
Expand Down Expand Up @@ -726,7 +723,7 @@ func priorityNames(priority int) []string {
// 2. Pods NOT on a priority node get high priority.
// 3. Pods not up to date with StatefulSet revision get high priority.
// 4. Pods part of a StatefulSet where desired replicas != actual replicas get medium priority.
func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource, priorityNodes, unschedulableNodes map[string]v1.Node) ([]v1.Pod, error) {
func prioritizePodsForUpdate(pods []*v1.Pod, sts *appsv1.StatefulSet, sr StatefulResource, priorityNodes, unschedulableNodes map[string]v1.Node) ([]v1.Pod, error) {
priorities := make([]*updatePriority, 0, len(pods))
for _, pod := range pods {
ordinal := strings.TrimPrefix(pod.Name, pod.GenerateName)
Expand All @@ -736,7 +733,7 @@ func prioritizePodsForUpdate(pods []v1.Pod, sts *appsv1.StatefulSet, sr Stateful
}

prio := &updatePriority{
Pod: pod,
Pod: *pod.DeepCopy(),
Number: number,
}

Expand Down
16 changes: 8 additions & 8 deletions operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,36 +166,36 @@ func TestPrioritizePodsForUpdate(t *testing.T) {
"node3": {},
}

pods := []v1.Pod{updatingPod}
pods := []*v1.Pod{&updatingPod}

sortedPods, err := prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 1)
assert.Equal(t, updatingPod, sortedPods[0])

// updating pod should be prioritized over stsPod
pods = []v1.Pod{stsPod, updatingPod}
pods = []*v1.Pod{&stsPod, &updatingPod}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, updatingPod, sortedPods[0])

// stsPods should be sorted by ordinal number
pods = []v1.Pod{stsPod, stsPod0}
pods = []*v1.Pod{&stsPod, &stsPod0}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, stsPod0, sortedPods[0])

// pods on unschedulable nodes should get higher priority
pods = []v1.Pod{stsPod, stsPod2}
pods = []*v1.Pod{&stsPod, &stsPod2}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 2)
assert.Equal(t, stsPod2, sortedPods[0])

// don't prioritize pods not on a node.
pods = []v1.Pod{podNoNode}
pods = []*v1.Pod{&podNoNode}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 0)
Expand All @@ -216,16 +216,16 @@ func TestPrioritizePodsForUpdate(t *testing.T) {
},
}

pods = []v1.Pod{podUpToDate}
pods = []*v1.Pod{&podUpToDate}
sortedPods, err = prioritizePodsForUpdate(pods, sts, sr, priorityNodes, unschedulableNodes)
assert.NoError(t, err)
assert.Len(t, sortedPods, 0)
}

func TestSortStatefulSetPods(t *testing.T) {
pods := make([]v1.Pod, 0, 13)
pods := make([]*v1.Pod, 0, 13)
for _, num := range []int{12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0} {
pods = append(pods, v1.Pod{
pods = append(pods, &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("sts-%d", num),
GenerateName: "sts-",
Expand Down

0 comments on commit ac84610

Please sign in to comment.