From 27bf91e4dfb7e6ec35256c795bf2b133004acf8a Mon Sep 17 00:00:00 2001 From: Jorge Turrado Ferrero Date: Mon, 16 Oct 2023 19:01:32 +0200 Subject: [PATCH] fix: Prevented stuck status due to timeouts during scalers generation (#5084) --- CHANGELOG.md | 2 +- pkg/scaling/scale_handler.go | 32 +-- .../broken_scaledobject_tolerancy_test.go | 186 ++++++++++++++++++ 3 files changed, 203 insertions(+), 17 deletions(-) create mode 100644 tests/sequential/broken_scaledobject_tolerancy/broken_scaledobject_tolerancy_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 042fc2b71ca..36accd6deb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,7 +63,7 @@ Here is an overview of all new **experimental** features: ### Fixes -- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Prevented stuck status due to timeouts during scalers generation ([#5083](https://github.com/kedacore/keda/issues/5083)) ### Deprecations diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 00c511788ba..5e7dc3ddc7f 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -291,6 +291,7 @@ func (h *scaleHandler) getScalersCacheForScaledObject(ctx context.Context, scale // performGetScalersCache returns cache for input scalableObject, it is common code used by GetScalersCache() and getScalersCacheForScaledObject() methods func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, scalableObject interface{}, scalableObjectGeneration *int64, scalableObjectKind, scalableObjectNamespace, scalableObjectName string) (*cache.ScalersCache, error) { h.scalerCachesLock.RLock() + regenerateCache := false if cache, ok := h.scalerCaches[key]; ok { // generation was specified -> let's include it in the check as well if scalableObjectGeneration != nil { @@ -298,6 +299,10 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s h.scalerCachesLock.RUnlock() return cache, nil } + // object was found in cache, but the generation is not correct, + // we'll need to close scalers in the cache and + // proceed further to recreate the cache + regenerateCache = false } else { h.scalerCachesLock.RUnlock() return cache, nil @@ -305,22 +310,6 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s } h.scalerCachesLock.RUnlock() - h.scalerCachesLock.Lock() - defer h.scalerCachesLock.Unlock() - if cache, ok := h.scalerCaches[key]; ok { - // generation was specified -> let's include it in the check as well - if scalableObjectGeneration != nil { - if cache.ScalableObjectGeneration == *scalableObjectGeneration { - return cache, nil - } - // object was found in cache, but the generation is not correct, - // let's close scalers in the cache and proceed further to recreate the cache - cache.Close(ctx) - } else { - return cache, nil - } - } - if scalableObject == nil { switch scalableObjectKind { case "ScaledObject": @@ -388,6 +377,17 @@ func (h *scaleHandler) performGetScalersCache(ctx context.Context, key string, s default: } + // Scalers Close() could be impacted by timeouts, blocking the mutex + // until the timeout happens. Instead of locking the mutex, we take + // the old cache item and we close it in another goroutine, not locking + // the cache: https://github.com/kedacore/keda/issues/5083 + if regenerateCache { + oldCache := h.scalerCaches[key] + go oldCache.Close(ctx) + } + + h.scalerCachesLock.Lock() + defer h.scalerCachesLock.Unlock() h.scalerCaches[key] = newCache return h.scalerCaches[key], nil } diff --git a/tests/sequential/broken_scaledobject_tolerancy/broken_scaledobject_tolerancy_test.go b/tests/sequential/broken_scaledobject_tolerancy/broken_scaledobject_tolerancy_test.go new file mode 100644 index 00000000000..f3bee811b9f --- /dev/null +++ b/tests/sequential/broken_scaledobject_tolerancy/broken_scaledobject_tolerancy_test.go @@ -0,0 +1,186 @@ +//go:build e2e +// +build e2e + +package broken_scaledobject_tolerancy_test + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +const ( + testName = "broken-scaledobject-tolerancy-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) +) + +type templateData struct { + TestNamespace string + DeploymentName string + MonitoredDeploymentName string + ScaledObjectName string +} + +const ( + monitoredDeploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.MonitoredDeploymentName}} + namespace: {{.TestNamespace}} + labels: + deploy: workload-test +spec: + replicas: 0 + selector: + matchLabels: + pod: workload-test + template: + metadata: + labels: + pod: workload-test + spec: + containers: + - name: nginx + image: 'nginxinc/nginx-unprivileged'` + + deploymentTemplate = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + deploy: workload-sut +spec: + replicas: 0 + selector: + matchLabels: + pod: workload-sut + template: + metadata: + labels: + pod: workload-sut + spec: + containers: + - name: nginx + image: 'nginxinc/nginx-unprivileged'` + + brokenScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}}-broken + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.MonitoredDeploymentName}} + minReplicaCount: 0 + maxReplicaCount: 1 + triggers: + - metadata: + activationLagThreshold: '1' + bootstrapServers: 1.2.3.4:9092 + consumerGroup: earliest + lagThreshold: '1' + offsetResetPolicy: earliest + topic: kafka-topic + type: kafka +` + + scaledObjectTemplate = `apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} +spec: + scaleTargetRef: + name: {{.DeploymentName}} + pollingInterval: 1 + cooldownPeriod: 0 + minReplicaCount: 0 + maxReplicaCount: 10 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 5 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'pod=workload-test' + value: '1' +` +) + +// As we need to ensure that a broken ScaledObject doesn't impact +// to other ScaledObjects https://github.com/kedacore/keda/issues/5083, +// this test deploys a broken ScaledObject pointing to missing endpoint +// which produces timeouts. In the meantime, we deploy another ScaledObject +// and validate that it works although the broken ScaledObject produces timeouts. +// all the time. This prevents us for introducing deadlocks on internal scalers cache +func TestBrokenScaledObjectTolerance(t *testing.T) { + // setup + t.Log("--- setting up ---") + // Create kubernetes resources + kc := GetKubernetesClient(t) + data, templates := getTemplateData() + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + testScaleOut(t, kc) + testScaleIn(t, kc) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + DeploymentName: deploymentName, + ScaledObjectName: scaledObjectName, + MonitoredDeploymentName: monitoredDeploymentName, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "brokenScaledObjectTemplate", Config: brokenScaledObjectTemplate}, + } +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset) { + // scale monitored deployment to 2 replicas + replicas := 2 + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6), + fmt.Sprintf("replica count should be %d after 1 minute", replicas)) + + // scale monitored deployment to 4 replicas + replicas = 4 + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6), + fmt.Sprintf("replica count should be %d after 1 minute", replicas)) +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + // scale monitored deployment to 2 replicas + replicas := 2 + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6), + fmt.Sprintf("replica count should be %d after 1 minute", replicas)) + + // scale monitored deployment to 0 replicas + replicas = 0 + KubernetesScaleDeployment(t, kc, monitoredDeploymentName, int64(replicas), testNamespace) + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, replicas, 10, 6), + fmt.Sprintf("replica count should be %d after 1 minute", replicas)) +}