Skip to content

Commit

Permalink
Support multiple queues at the IBMMQ scaler
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer committed Sep 23, 2024
1 parent 4444aca commit 5a9617c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 42 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Here is an overview of all new **experimental** features:
- **AWS CloudWatch Scaler**: Add support for ignoreNullValues ([#5352](https://github.com/kedacore/keda/issues/5352))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
Expand Down
92 changes: 50 additions & 42 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ type ibmmqScaler struct {
}

type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Host string `keda:"name=host, order=triggerMetadata"`
QueueName []string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}
Expand Down Expand Up @@ -129,54 +129,62 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro
}

func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
maxDepth := int64(0)
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return 0, fmt.Errorf("failed to request queue depth: %w", err)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
}
defer resp.Body.Close()
for _, queueName := range s.metadata.QueueName {
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request: %w", err)
}
resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err)
}
defer resp.Body.Close()

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON: %w", err)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call")
}
var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
depth := int64(response.CommandResponse[0].Parameters.Curdepth)
if depth > maxDepth {
maxDepth = depth
}
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
}

return int64(response.CommandResponse[0].Parameters.Curdepth), nil
return maxDepth, nil
}

func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0]))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand Down
2 changes: 2 additions & 0 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1,testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid activationQueueDepth using a string
Expand Down

0 comments on commit 5a9617c

Please sign in to comment.