From 5a9617cf7ee235e786e49a1f694f5817db135ce5 Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Mon, 23 Sep 2024 14:02:40 +0200 Subject: [PATCH] Support multiple queues at the IBMMQ scaler Signed-off-by: rickbrouwer --- CHANGELOG.md | 1 + pkg/scalers/ibmmq_scaler.go | 92 +++++++++++++++++--------------- pkg/scalers/ibmmq_scaler_test.go | 2 + 3 files changed, 53 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 71cef1b99cf..c577eab4421 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features: - **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352)) - **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) +- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) - TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index 0b0b4c893f4..609ceb87bdf 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -26,18 +26,18 @@ type ibmmqScaler struct { } type ibmmqMetadata struct { - Host string `keda:"name=host, order=triggerMetadata"` - QueueName string `keda:"name=queueName, order=triggerMetadata"` - QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` - ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` - Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` - Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` - UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` - TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead - CA string `keda:"name=ca, order=authParams, optional"` - Cert string `keda:"name=cert, order=authParams, optional"` - Key string `keda:"name=key, order=authParams, optional"` - KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Host string `keda:"name=host, order=triggerMetadata"` + QueueName []string `keda:"name=queueName, order=triggerMetadata"` + QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` + ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` + Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` + Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` + TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead + CA string `keda:"name=ca, order=authParams, optional"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` triggerIndex int } @@ -129,54 +129,62 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro } func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { - queue := s.metadata.QueueName + maxDepth := int64(0) url := s.metadata.Host - var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`) - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON)) + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { - return 0, fmt.Errorf("failed to request queue depth: %w", err) + return 0, fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("ibm-mq-rest-csrf-token", "value") req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(s.metadata.Username, s.metadata.Password) - resp, err := s.httpClient.Do(req) - if err != nil { - return 0, fmt.Errorf("failed to contact MQ via REST: %w", err) - } - defer resp.Body.Close() + for _, queueName := range s.metadata.QueueName { + requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`) + req.Body = io.NopCloser(bytes.NewBuffer(requestJSON)) - body, err := io.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read body of request: %w", err) - } + resp, err := s.httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err) + } + defer resp.Body.Close() - var response CommandResponse - err = json.Unmarshal(body, &response) - if err != nil { - return 0, fmt.Errorf("failed to parse JSON: %w", err) - } + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err) + } - if response.CommandResponse == nil || len(response.CommandResponse) == 0 { - return 0, fmt.Errorf("failed to parse response from REST call") - } + var response CommandResponse + err = json.Unmarshal(body, &response) + if err != nil { + return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err) + } + + if response.CommandResponse == nil || len(response.CommandResponse) == 0 { + return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName) + } + + if response.CommandResponse[0].Parameters == nil { + var reason string + message := strings.Join(response.CommandResponse[0].Message, " ") + if message != "" { + reason = fmt.Sprintf(", reason: %s", message) + } + return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason) + } - if response.CommandResponse[0].Parameters == nil { - var reason string - message := strings.Join(response.CommandResponse[0].Message, " ") - if message != "" { - reason = fmt.Sprintf(", reason: %s", message) + depth := int64(response.CommandResponse[0].Parameters.Curdepth) + if depth > maxDepth { + maxDepth = depth } - return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason) } - return int64(response.CommandResponse[0].Parameters.Curdepth), nil + return maxDepth, nil } func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { - metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName)) + metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0])) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index 30a7fc4b132..763882d4cd8 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -51,6 +51,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{ {map[string]string{}, true, map[string]string{}}, // Properly formed metadata {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed metadata with 2 queues + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1,testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid queueDepth using a string {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid activationQueueDepth using a string