Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple queues at the IBMMQ scaler #6182

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Here is an overview of all new **experimental** features:
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144))
Expand Down
126 changes: 87 additions & 39 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ type ibmmqScaler struct {
}

type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Host string `keda:"name=host, order=triggerMetadata"`
QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Operation string `keda:"name=operation, order=triggerMetadata, enum=max;avg;sum, default=max"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}
Expand Down Expand Up @@ -129,54 +130,101 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro
}

func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
depths := make([]int64, 0, len(s.metadata.QueueName))
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return 0, fmt.Errorf("failed to request queue depth: %w", err)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
for _, queueName := range s.metadata.QueueName {
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusUnauthorized {
return 0, fmt.Errorf("authentication failed: incorrect username or password")
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)
}

depth := int64(response.CommandResponse[0].Parameters.Curdepth)
depths = append(depths, depth)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request: %w", err)
switch s.metadata.Operation {
case sumOperation:
return sumDepths(depths), nil
case avgOperation:
return avgDepths(depths), nil
case maxOperation:
return maxDepth(depths), nil
default:
return 0, nil
}
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON: %w", err)
func sumDepths(depths []int64) int64 {
var sum int64
for _, depth := range depths {
sum += depth
}
return sum
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call")
func avgDepths(depths []int64) int64 {
if len(depths) == 0 {
return 0
}
return sumDepths(depths) / int64(len(depths))
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
func maxDepth(depths []int64) int64 {
if len(depths) == 0 {
return 0
}
max := depths[0]
for _, depth := range depths[1:] {
if depth > max {
max = depth
}
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
}

return int64(response.CommandResponse[0].Parameters.Curdepth), nil
return max
}

func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0]))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand Down
12 changes: 10 additions & 2 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues with param queueNames
{map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid operation
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "operation": "test", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid activationQueueDepth using a string
Expand Down Expand Up @@ -89,7 +95,7 @@ func TestIBMMQParseMetadata(t *testing.T) {
t.Error("Expected error but got success")
fmt.Println(testData)
}
if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
if metadata.Password != "" && metadata.Password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
Expand Down Expand Up @@ -216,7 +222,9 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {

scaler := ibmmqScaler{
metadata: ibmmqMetadata{
Host: server.URL,
Host: server.URL,
QueueName: []string{"TEST.QUEUE"},
Operation: "max",
},
httpClient: server.Client(),
}
Expand Down
Loading