From d6a084001a6cb6de9edfc9583327bdbf85f7aec4 Mon Sep 17 00:00:00 2001 From: Chris Taylor Date: Fri, 25 Aug 2023 18:19:58 +0100 Subject: [PATCH] feat(aws-sqs): Support for scaling to include delayed messages (#4900) --- CHANGELOG.md | 2 +- pkg/scalers/aws_sqs_queue_scaler.go | 30 +++++++----- pkg/scalers/aws_sqs_queue_scaler_test.go | 61 ++++++++++++++++++++++-- 3 files changed, 75 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b66d2a150aa..575cd37a074 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **AWS SQS Scaler**: Support for scaling to include delayed messages. [#4377](https://github.com/kedacore/keda/issues/4377) ### Improvements diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index effc6fa607e..2bc663ed244 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -21,17 +21,9 @@ const ( targetQueueLengthDefault = 5 activationTargetQueueLengthDefault = 0 defaultScaleOnInFlight = true + defaultScaleOnDelayed = false ) -var awsSqsQueueMetricNamesForScalingInFlight = []string{ - "ApproximateNumberOfMessages", - "ApproximateNumberOfMessagesNotVisible", -} - -var awsSqsQueueMetricNamesForNotScalingInFlight = []string{ - "ApproximateNumberOfMessages", -} - type awsSqsQueueScaler struct { metricType v2.MetricTargetType metadata *awsSqsQueueMetadata @@ -49,6 +41,7 @@ type awsSqsQueueMetadata struct { awsAuthorization awsAuthorizationMetadata scalerIndex int scaleOnInFlight bool + scaleOnDelayed bool awsSqsQueueMetricNames []string } @@ -78,6 +71,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs meta := awsSqsQueueMetadata{} meta.targetQueueLength = defaultTargetQueueLength meta.scaleOnInFlight = defaultScaleOnInFlight + meta.scaleOnDelayed = defaultScaleOnDelayed if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" { queueLength, err := strconv.ParseInt(val, 10, 64) @@ -109,10 +103,22 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs } } + if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" { + scaleOnDelayed, err := strconv.ParseBool(val) + if err != nil { + meta.scaleOnDelayed = defaultScaleOnDelayed + logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed) + } else { + meta.scaleOnDelayed = scaleOnDelayed + } + } + + meta.awsSqsQueueMetricNames = []string{"ApproximateNumberOfMessages"} if meta.scaleOnInFlight { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight - } else { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesNotVisible") + } + if meta.scaleOnDelayed { + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesDelayed") } if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index fc7e4cdd971..94381c95a28 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -3,6 +3,7 @@ package scalers import ( "context" "errors" + "strconv" "testing" "github.com/aws/aws-sdk-go/aws" @@ -25,6 +26,10 @@ const ( testAWSSQSErrorQueueURL = "https://sqs.eu-west-1.amazonaws.com/account_id/Error" testAWSSQSBadDataQueueURL = "https://sqs.eu-west-1.amazonaws.com/account_id/BadData" + + testAWSSQSApproximateNumberOfMessagesVisible = 200 + testAWSSQSApproximateNumberOfMessagesNotVisible = 100 + testAWSSQSApproximateNumberOfMessagesDelayed = 50 ) var testAWSSQSEmptyResolvedEnv = map[string]string{} @@ -65,14 +70,16 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G Attributes: map[string]*string{ "ApproximateNumberOfMessages": aws.String("NotInt"), "ApproximateNumberOfMessagesNotVisible": aws.String("NotInt"), + "ApproximateNumberOfMessagesDelayed": aws.String("NotInt"), }, }, nil } return &sqs.GetQueueAttributesOutput{ Attributes: map[string]*string{ - "ApproximateNumberOfMessages": aws.String("200"), - "ApproximateNumberOfMessagesNotVisible": aws.String("100"), + "ApproximateNumberOfMessages": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesVisible)), + "ApproximateNumberOfMessagesNotVisible": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesNotVisible)), + "ApproximateNumberOfMessagesDelayed": aws.String(strconv.Itoa(testAWSSQSApproximateNumberOfMessagesDelayed)), }, }, nil } @@ -326,6 +333,44 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ testAWSSQSEmptyResolvedEnv, false, "not error with scaleOnInFlight enabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed enabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "false", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaledOnInFlight and scaleOnDelayed disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "true", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaledOnInFlight and scaleOnDelayed enabled"}, {map[string]string{ "queueURL": testAWSSQSErrorQueueURL, "queueLength": "1", @@ -390,11 +435,17 @@ func TestAWSSQSScalerGetMetrics(t *testing.T) { case testAWSSQSBadDataQueueURL: assert.Error(t, err, "expect error because of bad data return from sqs") default: + expectedMessages := testAWSSQSApproximateNumberOfMessagesVisible + if meta.scaleOnInFlight { - assert.EqualValues(t, int64(300.0), value[0].Value.Value()) - } else { - assert.EqualValues(t, int64(200.0), value[0].Value.Value()) + expectedMessages += testAWSSQSApproximateNumberOfMessagesNotVisible } + + if meta.scaleOnDelayed { + expectedMessages += testAWSSQSApproximateNumberOfMessagesDelayed + } + + assert.EqualValues(t, int64(expectedMessages), value[0].Value.Value()) } } }