Skip to content

Commit

Permalink
Kafka Scaler: Add UnsafeSsl flag (kedacore#4911)
Browse files Browse the repository at this point in the history
Signed-off-by: Bojan Zelic <[email protected]>
Signed-off-by: anton.lysina <[email protected]>
  • Loading branch information
BojanZelic authored and toniiiik committed Jan 15, 2024
1 parent 423426e commit 463d4a5
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Here is an overview of all new **experimental** features:
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledObject to pause autoscaling ([#3304](https://github.com/kedacore/keda/issues/3304))
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
- **Kafka Scaler**: Add UnsafeSSL flag for kafka scaler ([#4977](https://github.com/kedacore/keda/issues/4977))
- **Prometheus Scaler**: Remove trailing whitespaces in customAuthHeader and customAuthValue ([#4960](https://github.com/kedacore/keda/issues/4960))
- **Pulsar Scaler**: Add support for OAuth extensions ([#4700](https://github.com/kedacore/keda/issues/4700))
- **Redis Scalers**: Add TLS authentication support for Redis and Redis stream scalers ([#4917](https://github.com/kedacore/keda/issues/4917))
Expand Down
51 changes: 34 additions & 17 deletions pkg/scalers/kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type kafkaMetadata struct {
key string
keyPassword string
ca string
unsafeSsl bool

scalerIndex int
}
Expand Down Expand Up @@ -230,24 +231,40 @@ func parseKafkaAuthParams(config *ScalerConfig, meta *kafkaMetadata) error {
}

if enableTLS {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
return parseTLS(config, meta)
}

return nil
}

func parseTLS(config *ScalerConfig, meta *kafkaMetadata) error {
certGiven := config.AuthParams["cert"] != ""
keyGiven := config.AuthParams["key"] != ""
if certGiven && !keyGiven {
return errors.New("key must be provided with cert")
}
if keyGiven && !certGiven {
return errors.New("cert must be provided with key")
}
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.unsafeSsl = defaultUnsafeSsl

if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
unsafeSsl, err := strconv.ParseBool(val)
if err != nil {
return fmt.Errorf("error parsing unsafeSsl: %w", err)
}
meta.enableTLS = true
meta.unsafeSsl = unsafeSsl
}

if value, found := config.AuthParams["keyPassword"]; found {
meta.keyPassword = value
} else {
meta.keyPassword = ""
}
meta.enableTLS = true

return nil
}
Expand Down Expand Up @@ -391,7 +408,7 @@ func getKafkaClients(metadata kafkaMetadata) (sarama.Client, sarama.ClusterAdmin

if metadata.enableTLS {
config.Net.TLS.Enable = true
tlsConfig, err := kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, false)
tlsConfig, err := kedautil.NewTLSConfigWithPassword(metadata.cert, metadata.key, metadata.keyPassword, metadata.ca, metadata.unsafeSsl)
if err != nil {
return nil, nil, err
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"reflect"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -181,6 +182,8 @@ var parseAuthParamsTestDataset = []parseAuthParamsTestDataSecondAuthMethod{
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"cert": "ceert", "key": "keey", "keyPassword": "keeyPassword"}, false, true},
// success, TLS CA only
{map[string]string{"tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa"}, false, true},
// success, TLS CA only and unsafeSSL
{map[string]string{"tls": "enable", "unsafeSsl": "true", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"ca": "caaa"}, false, true},
// success, SASL + TLS
{map[string]string{"sasl": "plaintext", "tls": "enable", "bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, map[string]string{"username": "admin", "password": "admin", "ca": "caaa", "cert": "ceert", "key": "keey"}, false, true},
// success, SASL + TLS explicitly disabled
Expand Down Expand Up @@ -382,6 +385,15 @@ func TestKafkaAuthParams(t *testing.T) {
if meta.keyPassword != testData.authParams["keyPassword"] {
t.Errorf("Test case: %v. Expected key to be set to %v but got %v\n", id, testData.authParams["keyPassword"], meta.keyPassword)
}
if val, ok := testData.authParams["unsafeSsl"]; ok && err == nil {
boolVal, err := strconv.ParseBool(val)
if err != nil && !testData.isError {
t.Errorf("Expect error but got success in test case %s", meta.key)
}
if boolVal != meta.unsafeSsl {
t.Errorf("Expected unsafeSsl key to be set to %v but got %v\n", boolVal, meta.unsafeSsl)
}
}
}
}
}
Expand Down

0 comments on commit 463d4a5

Please sign in to comment.