Skip to content

Commit

Permalink
Refactor IBM MQ scaler and remove and deprecate variables (kedacore#6034
Browse files Browse the repository at this point in the history
)

Signed-off-by: Rick Brouwer <[email protected]>
Signed-off-by: Fira Curie <[email protected]>
  • Loading branch information
rickbrouwer authored and fira42073 committed Aug 25, 2024
1 parent 32ea028 commit 9c08822
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 178 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@ Here is an overview of all new **experimental** features:

### Deprecations

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

You can find all deprecations in [this overview](https://github.com/kedacore/keda/issues?q=is%3Aissue+is%3Aopen+sort%3Aupdated-desc+label%3Abreaking-change) and [join the discussion here](https://github.com/kedacore/keda/discussions/categories/deprecations).

New deprecation(s):

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- IBM MQ Scaler: Remove and deprecate unused variables in IBM MQ scaler ([#6033](https://github.com/kedacore/keda/issues/6033))

### Breaking Changes

Expand Down
217 changes: 70 additions & 147 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
Expand All @@ -20,39 +18,28 @@ import (
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

// Default variables and settings
const (
defaultTargetQueueDepth = 20
defaultTLSDisabled = false
)

// IBMMQScaler assigns struct data pointer to metadata variable
type IBMMQScaler struct {
metricType v2.MetricTargetType
metadata *IBMMQMetadata
defaultHTTPTimeout time.Duration
httpClient *http.Client
logger logr.Logger
type ibmmqScaler struct {
metricType v2.MetricTargetType
metadata ibmmqMetadata
httpClient *http.Client
logger logr.Logger
}

// IBMMQMetadata Metadata used by KEDA to query IBM MQ queue depth and scale
type IBMMQMetadata struct {
host string
queueManager string
queueName string
username string
password string
queueDepth int64
activationQueueDepth int64
tlsDisabled bool
triggerIndex int

// TLS
ca string
cert string
key string
keyPassword string
unsafeSsl bool
type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}

// CommandResponse Full structured response from MQ admin REST query
Expand All @@ -71,142 +58,79 @@ type Parameters struct {
Curdepth int `json:"curdepth"`
}

// NewIBMMQScaler creates a new IBM MQ scaler
func (m *ibmmqMetadata) Validate() error {
_, err := url.ParseRequestURI(m.Host)
if err != nil {
return fmt.Errorf("invalid URL: %w", err)
}

if (m.Cert == "") != (m.Key == "") {
return fmt.Errorf("both cert and key must be provided when using TLS")
}

// TODO: DEPRECATED to be removed in v2.18
if m.TLS && m.UnsafeSsl {
return fmt.Errorf("'tls' and 'unsafeSsl' are both specified. Please use only 'unsafeSsl'")
}

return nil
}

func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "ibm_mq_scaler")

meta, err := parseIBMMQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.tlsDisabled)
// TODO: DEPRECATED to be removed in v2.18
if meta.TLS {
logger.Info("The 'tls' setting is DEPRECATED and will be removed in v2.18 - Use 'unsafeSsl' instead")
meta.UnsafeSsl = meta.TLS
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.UnsafeSsl)

// Configure TLS if cert and key are specified
if meta.cert != "" && meta.key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if meta.Cert != "" && meta.Key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.Cert, meta.Key, meta.KeyPassword, meta.CA, meta.UnsafeSsl)
if err != nil {
return nil, err
}
httpClient.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig)
}

return &IBMMQScaler{
metricType: metricType,
metadata: meta,
defaultHTTPTimeout: config.GlobalHTTPTimeout,
httpClient: httpClient,
logger: InitializeLogger(config, "ibm_mq_scaler"),
return &ibmmqScaler{
metricType: metricType,
metadata: meta,
httpClient: httpClient,
logger: logger,
}, nil
}

// Close closes and returns nil
func (s *IBMMQScaler) Close(context.Context) error {
func (s *ibmmqScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

// parseIBMMQMetadata checks the existence of and validates the MQ connection data provided
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (*IBMMQMetadata, error) {
meta := IBMMQMetadata{}

if val, ok := config.TriggerMetadata["host"]; ok {
_, err := url.ParseRequestURI(val)
if err != nil {
return nil, fmt.Errorf("invalid URL: %w", err)
}
meta.host = val
} else {
return nil, fmt.Errorf("no host URI given")
}

if val, ok := config.TriggerMetadata["queueManager"]; ok {
meta.queueManager = val
} else {
return nil, fmt.Errorf("no queue manager given")
}

if val, ok := config.TriggerMetadata["queueName"]; ok {
meta.queueName = val
} else {
return nil, fmt.Errorf("no queue name given")
func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, error) {
meta := ibmmqMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, err
}

if val, ok := config.TriggerMetadata["queueDepth"]; ok && val != "" {
queueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid queueDepth - must be an integer")
}
meta.queueDepth = queueDepth
} else {
fmt.Println("No target depth defined - setting default")
meta.queueDepth = defaultTargetQueueDepth
}

meta.activationQueueDepth = 0
if val, ok := config.TriggerMetadata["activationQueueDepth"]; ok && val != "" {
activationQueueDepth, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid activationQueueDepth - must be an integer")
}
meta.activationQueueDepth = activationQueueDepth
}

if val, ok := config.TriggerMetadata["tls"]; ok {
tlsDisabled, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("invalid tls setting: %w", err)
}
meta.tlsDisabled = tlsDisabled
} else {
fmt.Println("No tls setting defined - setting default")
meta.tlsDisabled = defaultTLSDisabled
}

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
} else if val, ok := config.TriggerMetadata["usernameFromEnv"]; ok && val != "" {
meta.username = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no username given")
}

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["passwordFromEnv"]; ok && val != "" {
meta.password = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no password given")
}

// TLS config (optional)
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.keyPassword = config.AuthParams["keyPassword"]

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("failed to parse unsafeSsl value. Must be either true or false")
}
meta.unsafeSsl = boolVal
}

meta.triggerIndex = config.TriggerIndex
return &meta, nil
return meta, nil
}

// getQueueDepthViaHTTP returns the depth of the MQ Queue from the Admin endpoint
func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.queueName
url := s.metadata.host
func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
Expand All @@ -216,7 +140,7 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.username, s.metadata.password)
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -251,26 +175,25 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
return int64(response.CommandResponse[0].Parameters.Curdepth), nil
}

// GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler
func (s *IBMMQScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.queueName))),
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
},
Target: GetMetricTarget(s.metricType, s.metadata.queueDepth),
Target: GetMetricTarget(s.metricType, s.metadata.QueueDepth),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: externalMetricType}
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *IBMMQScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
func (s *ibmmqScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
queueDepth, err := s.getQueueDepthViaHTTP(ctx)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error inspecting IBM MQ queue depth: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(queueDepth))

return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.activationQueueDepth, nil
return []external_metrics.ExternalMetricValue{metric}, queueDepth > s.metadata.ActivationQueueDepth, nil
}
Loading

0 comments on commit 9c08822

Please sign in to comment.