From 063ca3ddde315c00679e5218a6ddfe6a88d45ecd Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:24:05 -0700 Subject: [PATCH 1/8] feat: add nsq scaler Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- CHANGELOG.md | 2 + pkg/scalers/nsq_scaler.go | 358 ++++++++++++++++++ pkg/scalers/nsq_scaler_test.go | 658 +++++++++++++++++++++++++++++++++ pkg/scaling/scalers_builder.go | 2 + tests/scalers/nsq/nsq_test.go | 214 +++++++++++ 5 files changed, 1234 insertions(+) create mode 100644 pkg/scalers/nsq_scaler.go create mode 100644 pkg/scalers/nsq_scaler_test.go create mode 100644 tests/scalers/nsq/nsq_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index deab8a04db4..af74a7d80b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,6 +55,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ## Unreleased +- **General:** Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) + ### New - **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) diff --git a/pkg/scalers/nsq_scaler.go b/pkg/scalers/nsq_scaler.go new file mode 100644 index 00000000000..7a99d95baa8 --- /dev/null +++ b/pkg/scalers/nsq_scaler.go @@ -0,0 +1,358 @@ +package scalers + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strconv" + "sync" + + "github.com/go-logr/logr" + v2 "k8s.io/api/autoscaling/v2" + "k8s.io/metrics/pkg/apis/external_metrics" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type nsqScaler struct { + metricType v2.MetricTargetType + metadata nsqMetadata + httpClient *http.Client + logger logr.Logger +} + +type nsqMetadata struct { + NSQLookupdHTTPAddresses []string `keda:"name=nsqLookupdHTTPAddresses, order=triggerMetadata;resolvedEnv"` + Topic string `keda:"name=topic, order=triggerMetadata;resolvedEnv"` + Channel string `keda:"name=channel, order=triggerMetadata;resolvedEnv"` + DepthThreshold int64 `keda:"name=depthThreshold, order=triggerMetadata;resolvedEnv, default=10"` + ActivationDepthThreshold int64 `keda:"name=activationDepthThreshold, order=triggerMetadata;resolvedEnv, default=0"` + + triggerIndex int +} + +const ( + nsqMetricType = "External" +) + +func NewNSQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { + metricType, err := GetMetricTargetType(config) + if err != nil { + return nil, fmt.Errorf("error getting scaler metric type: %w", err) + } + + logger := InitializeLogger(config, "nsq_scaler") + + nsqMetadata, err := parseNSQMetadata(config) + if err != nil { + return nil, fmt.Errorf("error parsing NSQ metadata: %w", err) + } + + return &nsqScaler{ + metricType: metricType, + metadata: nsqMetadata, + httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, true), + logger: logger, + }, nil +} + +func (m nsqMetadata) Validate() error { + if len(m.NSQLookupdHTTPAddresses) == 0 { + return fmt.Errorf("no nsqLookupdHTTPAddresses given") + } + + if m.Topic == "" { + return fmt.Errorf("no topic given") + } + + if m.Channel == "" { + return fmt.Errorf("no channel given") + } + + if m.DepthThreshold <= 0 { + return fmt.Errorf("depthThreshold must be a positive integer") + } + + if m.ActivationDepthThreshold < 0 { + return fmt.Errorf("activationDepthThreshold must be greater than or equal to 0") + } + + return nil +} + +func parseNSQMetadata(config *scalersconfig.ScalerConfig) (nsqMetadata, error) { + meta := nsqMetadata{triggerIndex: config.TriggerIndex} + if err := config.TypedConfig(&meta); err != nil { + return meta, fmt.Errorf("error parsing nsq metadata: %w", err) + } + + return meta, nil +} + +func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + depth, err := s.getTopicChannelDepth() + + if err != nil { + return []external_metrics.ExternalMetricValue{}, false, err + } + + s.logger.Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) + + metric := GenerateMetricInMili(metricName, float64(depth)) + + return []external_metrics.ExternalMetricValue{metric}, depth > s.metadata.ActivationDepthThreshold, nil +} + +func (s nsqScaler) getTopicChannelDepth() (int64, error) { + nsqdHosts, err := s.getTopicProducers(s.metadata.Topic) + if err != nil { + return -1, fmt.Errorf("error getting nsqd hosts: %w", err) + } + + if len(nsqdHosts) == 0 { + s.logger.Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) + return 0, nil + } + + depth, err := s.aggregateDepth(nsqdHosts, s.metadata.Topic, s.metadata.Channel) + if err != nil { + return -1, fmt.Errorf("error getting topic/channel depth: %w", err) + } + + return depth, nil +} + +func (s nsqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { + metricName := fmt.Sprintf("nsq-%s-%s", s.metadata.Topic, s.metadata.Channel) + + externalMetric := &v2.ExternalMetricSource{ + Metric: v2.MetricIdentifier{ + Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)), + }, + Target: GetMetricTarget(s.metricType, s.metadata.DepthThreshold), + } + metricSpec := v2.MetricSpec{External: externalMetric, Type: nsqMetricType} + return []v2.MetricSpec{metricSpec} +} + +func (s nsqScaler) Close(context.Context) error { + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + return nil +} + +type lookupResponse struct { + Producers []struct { + HTTPPort int `json:"http_port"` + BroadcastAddress string `json:"broadcast_address"` + } +} + +type lookupResult struct { + host string + lookupResponse *lookupResponse + err error +} + +func (s *nsqScaler) getTopicProducers(topic string) ([]string, error) { + var wg sync.WaitGroup + resultCh := make(chan lookupResult, len(s.metadata.NSQLookupdHTTPAddresses)) + + for _, host := range s.metadata.NSQLookupdHTTPAddresses { + wg.Add(1) + go func(host string, topic string) { + defer wg.Done() + resp, err := s.getLookup(host, topic) + resultCh <- lookupResult{host, resp, err} + }(host, topic) + } + + wg.Wait() + close(resultCh) + + var nsqdHostMap = make(map[string]bool) + for result := range resultCh { + if result.err != nil { + return nil, fmt.Errorf("error getting lookup from host '%s': %w", result.host, result.err) + } + + if result.lookupResponse == nil { + // topic is not found on a single nsqlookupd host, it may exist on another + continue + } + + for _, producer := range result.lookupResponse.Producers { + nsqdHost := net.JoinHostPort(producer.BroadcastAddress, strconv.Itoa(producer.HTTPPort)) + nsqdHostMap[nsqdHost] = true + } + } + + var nsqdHosts []string + for nsqdHost := range nsqdHostMap { + nsqdHosts = append(nsqdHosts, nsqdHost) + } + + return nsqdHosts, nil +} + +func (s *nsqScaler) getLookup(host string, topic string) (*lookupResponse, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "lookup"), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/json; charset=utf-8") + + params := url.Values{"topic": {topic}} + req.URL.RawQuery = params.Encode() + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code '%s'", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var lookupResponse lookupResponse + err = json.Unmarshal(body, &lookupResponse) + if err != nil { + return nil, err + } + + return &lookupResponse, nil +} + +type statsResponse struct { + Topics []struct { + TopicName string `json:"topic_name"` + Depth int64 `json:"depth"` + Channels []struct { + ChannelName string `json:"channel_name"` + Depth int64 `json:"depth"` // num messages in the queue (mem + disk) + Paused bool `json:"paused"` // if paused, consumers will not receive messages + } + } +} + +type statsResult struct { + host string + statsResponse *statsResponse + err error +} + +func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel string) (int64, error) { + wg := sync.WaitGroup{} + resultCh := make(chan statsResult, len(nsqdHosts)) + + for _, host := range nsqdHosts { + wg.Add(1) + go func(host string, topic string) { + defer wg.Done() + resp, err := s.getStats(host, topic) + resultCh <- statsResult{host, resp, err} + }(host, topic) + } + + wg.Wait() + close(resultCh) + + var depth int64 + for result := range resultCh { + if result.err != nil { + return -1, fmt.Errorf("error getting stats from host '%s': %w", result.host, result.err) + } + + for _, t := range result.statsResponse.Topics { + if t.TopicName != topic { + // this should never happen as we make the /stats call with the "topic" param + continue + } + + if len(t.Channels) == 0 { + // topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap + s.logger.Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) + depth += t.Depth + continue + } + + channelExists := false + for _, ch := range t.Channels { + if ch.ChannelName != channel { + continue + } + channelExists = true + if ch.Paused { + // if it's paused on a single nsqd host, it's depth should not go into the aggregate + // meaning if paused on all nsqd hosts => depth == 0 + s.logger.Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) + continue + } + depth += ch.Depth + } + if !channelExists { + // topic exists with channels, but not the one in question - fallback to topic depth + s.logger.Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) + depth += t.Depth + } + } + } + + return depth, nil +} + +func (s *nsqScaler) getStats(host string, topic string) (*statsResponse, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "stats"), nil) + if err != nil { + return nil, err + } + + // "channel" is a query param as well, but if used and the channel does not exist + // we do not receive any stats for the existing topic + params := url.Values{ + "format": {"json"}, + "include_clients": {"false"}, + "include_mem": {"false"}, + "topic": {topic}, + } + req.URL.RawQuery = params.Encode() + + resp, err := s.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code '%s'", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var statsResponse statsResponse + err = json.Unmarshal(body, &statsResponse) + if err != nil { + return nil, err + } + + return &statsResponse, nil +} diff --git a/pkg/scalers/nsq_scaler_test.go b/pkg/scalers/nsq_scaler_test.go new file mode 100644 index 00000000000..d97d167c44c --- /dev/null +++ b/pkg/scalers/nsq_scaler_test.go @@ -0,0 +1,658 @@ +package scalers + +import ( + "context" + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-logr/logr" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" + v2 "k8s.io/api/autoscaling/v2" + + "github.com/kedacore/keda/v2/pkg/scalers/scalersconfig" +) + +type nsqMetadataTestData struct { + metadata map[string]string + numNSQLookupdHTTPAddresses int + nsqLookupdHTTPAddresses []string + topic string + channel string + depthThreshold int64 + activationDepthThreshold int64 + isError bool + description string +} + +type nsqMetricIdentifier struct { + metadataTestData *nsqMetadataTestData + triggerIndex int + name string + metricType string +} + +var parseNSQMetadataTestDataset = []nsqMetadataTestData{ + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel"}, + numNSQLookupdHTTPAddresses: 1, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 10, + activationDepthThreshold: 0, + isError: false, + description: "Success", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161,nsqlookupd-1:4161", "topic": "topic", "channel": "channel"}, + numNSQLookupdHTTPAddresses: 2, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161", "nsqlookupd-1:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 10, + activationDepthThreshold: 0, + isError: false, + description: "Success, multiple nsqlookupd addresses", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "100", "activationDepthThreshold": "1"}, + numNSQLookupdHTTPAddresses: 1, + nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161"}, + topic: "topic", + channel: "channel", + depthThreshold: 100, + activationDepthThreshold: 1, + isError: false, + description: "Success - setting optional fields", + }, + { + metadata: map[string]string{"topic": "topic", "channel": "channel"}, + isError: true, + description: "Error, no nsqlookupd addresses", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "channel": "channel"}, + isError: true, + description: "Error, no topic", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic"}, + isError: true, + description: "Error, no channel", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "0"}, + isError: true, + description: "Error, depthThreshold is <=0", + }, + { + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "activationDepthThreshold": "-1"}, + isError: true, + description: "Error, activationDepthThreshold is <0", + }, +} + +var nsqMetricIdentifiers = []nsqMetricIdentifier{ + {&parseNSQMetadataTestDataset[0], 0, "s0-nsq-topic-channel", "Value"}, + {&parseNSQMetadataTestDataset[0], 1, "s1-nsq-topic-channel", "AverageValue"}, +} + +func TestNSQParseMetadata(t *testing.T) { + for _, testData := range parseNSQMetadataTestDataset { + config := scalersconfig.ScalerConfig{TriggerMetadata: testData.metadata} + + meta, err := parseNSQMetadata(&config) + if err != nil { + if testData.isError { + continue + } + t.Error("Expected success, got error", err, testData.description) + } + if err == nil && testData.isError { + t.Error("Expected error, got success", testData.description) + } + + assert.Equal(t, testData.numNSQLookupdHTTPAddresses, len(meta.NSQLookupdHTTPAddresses), testData.description) + assert.Equal(t, testData.nsqLookupdHTTPAddresses, meta.NSQLookupdHTTPAddresses, testData.description) + assert.Equal(t, testData.topic, meta.Topic, testData.description) + assert.Equal(t, testData.channel, meta.Channel, testData.description) + assert.Equal(t, testData.depthThreshold, meta.DepthThreshold, testData.description) + assert.Equal(t, testData.activationDepthThreshold, meta.ActivationDepthThreshold, testData.description) + } +} + +func TestNSQGetMetricsAndActivity(t *testing.T) { + type testCase struct { + lookupError bool + statsError bool + expectedDepth int64 + expectedActive bool + activationdDepthThreshold int64 + } + testCases := []testCase{ + { + lookupError: true, + }, + { + statsError: true, + }, + { + expectedDepth: 100, + expectedActive: true, + }, + { + expectedDepth: 0, + expectedActive: false, + }, + { + expectedDepth: 9, + activationdDepthThreshold: 10, + expectedActive: false, + }, + } + for _, tc := range testCases { + mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"topics":[{"topic_name":"topic","channels":[{"channel_name":"channel","depth":%d}]}]}`, tc.expectedDepth) + })) + defer mockNSQdServer.Close() + + parsedNSQdURL, err := url.Parse(mockNSQdServer.URL) + assert.Nil(t, err) + + mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port()) + })) + defer mockNSQLookupdServer.Close() + + parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL) + assert.Nil(t, err) + + nsqlookupdHost := net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port()) + + config := scalersconfig.ScalerConfig{TriggerMetadata: map[string]string{ + "nsqLookupdHTTPAddresses": nsqlookupdHost, + "topic": "topic", + "channel": "channel", + "activationDepthThreshold": fmt.Sprintf("%d", tc.activationdDepthThreshold), + }} + meta, err := parseNSQMetadata(&config) + assert.Nil(t, err) + + s := nsqScaler{v2.AverageValueMetricType, meta, http.DefaultClient, logr.Discard()} + + metricName := "s0-nsq-topic-channel" + metrics, activity, err := s.GetMetricsAndActivity(context.Background(), metricName) + + if err != nil && (tc.lookupError || tc.statsError) { + assert.Equal(t, 0, len(metrics)) + assert.False(t, activity) + continue + } + + assert.Nil(t, err) + assert.Equal(t, 1, len(metrics)) + assert.Equal(t, metricName, metrics[0].MetricName) + assert.Equal(t, tc.expectedDepth, metrics[0].Value.Value()) + if tc.expectedActive { + assert.True(t, activity) + } else { + assert.False(t, activity) + } + } +} + +func TestNSQGetMetricSpecForScaling(t *testing.T) { + for _, testData := range nsqMetricIdentifiers { + meta, err := parseNSQMetadata(&scalersconfig.ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, TriggerIndex: testData.triggerIndex}) + if err != nil { + t.Fatal("Could not parse metadata:", err) + } + + metricType := v2.MetricTargetType(testData.metricType) + mockNSQScaler := nsqScaler{metricType, meta, nil, logr.Discard()} + + metricSpec := mockNSQScaler.GetMetricSpecForScaling(context.Background()) + metricName := metricSpec[0].External.Metric.Name + assert.Equal(t, testData.name, metricName) + assert.Equal(t, 1, len(metricSpec)) + assert.Equal(t, metricType, metricSpec[0].External.Target.Type) + depthThreshold := meta.DepthThreshold + if metricType == v2.AverageValueMetricType { + assert.Equal(t, depthThreshold, metricSpec[0].External.Target.AverageValue.Value()) + } else { + assert.Equal(t, depthThreshold, metricSpec[0].External.Target.Value.Value()) + } + } +} + +func TestNSQGetTopicChannelDepth(t *testing.T) { + type testCase struct { + lookupError bool + topicNotExist bool + producersNotExist bool + statsError bool + channelPaused bool + expectedDepth int64 + description string + } + testCases := []testCase{ + { + lookupError: true, + description: "nsqlookupd call failed", + }, + { + topicNotExist: true, + expectedDepth: 0, + description: "Topic does not exist", + }, + { + producersNotExist: true, + expectedDepth: 0, + description: "No producers for topic", + }, + { + statsError: true, + description: "nsqd call failed", + }, + { + channelPaused: true, + expectedDepth: 0, + description: "Channel is paused", + }, + { + expectedDepth: 100, + description: "successfully retrieved depth", + }, + } + + for _, tc := range testCases { + mockNSQdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.statsError { + w.WriteHeader(http.StatusInternalServerError) + return + } + if tc.channelPaused { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`) + return + } + + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`) + })) + defer mockNSQdServer.Close() + + parsedNSQdURL, err := url.Parse(mockNSQdServer.URL) + assert.Nil(t, err) + + mockNSQLookupdServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.lookupError { + w.WriteHeader(http.StatusInternalServerError) + return + } + if tc.topicNotExist { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"message": "TOPIC_NOT_FOUND"}`) + return + } + if tc.producersNotExist { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, `{"producers":[]}`) + return + } + + w.WriteHeader(http.StatusOK) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprintf(w, `{"producers":[{"broadcast_address":"%s","http_port":%s}]}`, parsedNSQdURL.Hostname(), parsedNSQdURL.Port()) + })) + defer mockNSQLookupdServer.Close() + + parsedNSQLookupdURL, err := url.Parse(mockNSQLookupdServer.URL) + assert.Nil(t, err) + + nsqLookupdHosts := []string{net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port())} + + s := nsqScaler{httpClient: http.DefaultClient, metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts, Topic: "topic", Channel: "channel"}} + + depth, err := s.getTopicChannelDepth() + + if err != nil && (tc.lookupError || tc.statsError) { + continue + } + + assert.Nil(t, err) + assert.Equal(t, tc.expectedDepth, depth) + } +} + +func TestNSQGetTopicProducers(t *testing.T) { + type statusAndResponse struct { + status int + response string + } + type testCase struct { + statusAndResponses []statusAndResponse + expectedNSQdHosts []string + isError bool + description string + } + testCases := []testCase{ + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[], "channels":[]}`}, + }, + expectedNSQdHosts: []string{}, + description: "No producers or channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161"}, + description: "Single nsqd host", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}, {"broadcast_address":"nsqd-1","http_port":4161}]}`}, + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-2","http_port":8161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161", "nsqd-1:4161", "nsqd-2:8161"}, + description: "Multiple nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + }, + expectedNSQdHosts: []string{"nsqd-0:4161"}, + description: "De-dupe nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"producers":[{"broadcast_address":"nsqd-0","http_port":4161}]}`}, + {http.StatusInternalServerError, ""}, + }, + isError: true, + description: "At least one host responded with error", + }, + } + + for _, tc := range testCases { + callCount := atomic.NewInt32(-1) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount.Inc() + w.WriteHeader(tc.statusAndResponses[callCount.Load()].status) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + var nsqLookupdHosts []string + nsqLookupdHost := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + for i := 0; i < len(tc.statusAndResponses); i++ { + nsqLookupdHosts = append(nsqLookupdHosts, nsqLookupdHost) + } + + s := nsqScaler{httpClient: http.DefaultClient, metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts}} + + nsqdHosts, err := s.getTopicProducers("topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err) + assert.ElementsMatch(t, tc.expectedNSQdHosts, nsqdHosts) + } +} + +func TestNSQGetLookup(t *testing.T) { + type testCase struct { + serverStatus int + serverResponse string + isError bool + description string + } + testCases := []testCase{ + { + serverStatus: http.StatusNotFound, + serverResponse: `{"message": "TOPIC_NOT_FOUND"}`, + isError: false, + description: "Topic does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"producers":[{"broadcast_address":"nsqd-0","http_port":4151}], "channels":[]}`, + isError: false, + description: "Channel does not exist", + }, + { + serverStatus: http.StatusNotFound, + serverResponse: `{"producers":[], "channels":["channel"]}`, + isError: false, + description: "No nsqd producers exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"producers":[{"broadcast_address":"nsqd-0", "http_port":4151}], "channels":["channel"]}`, + isError: false, + description: "Topic and channel exist with nsqd producers", + }, + { + serverStatus: http.StatusInternalServerError, + isError: true, + description: "Host responds with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient} + for _, tc := range testCases { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.serverStatus) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.serverResponse) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + + resp, err := s.getLookup(host, "topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + + if tc.serverStatus != http.StatusNotFound { + assert.NotNil(t, resp, tc.description) + } else { + assert.Nil(t, resp, tc.description) + } + } +} + +func TestNSQAggregateDepth(t *testing.T) { + type statusAndResponse struct { + status int + response string + } + type testCase struct { + statusAndResponses []statusAndResponse + expectedDepth int64 + isError bool + description string + } + testCases := []testCase{ + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":null}`}, + }, + expectedDepth: 0, + isError: false, + description: "Topic does not exist", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[]}]}`}, + }, + expectedDepth: 250, + isError: false, + description: "Topic exists with no channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"other_channel", "depth":100}]}]}`}, + }, + expectedDepth: 250, + isError: false, + description: "Topic exists with different channels", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + }, + expectedDepth: 100, + isError: false, + description: "Topic and channel exist", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100, "paused":true}]}]}`}, + }, + expectedDepth: 0, + isError: false, + description: "Channel is paused", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":50}]}]}`}, + }, + expectedDepth: 150, + isError: false, + description: "Sum multiple depth values", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":500, "channels":[]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":400, "channels":[{"channel_name":"other_channel", "depth":300}]}]}`}, + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":200, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + }, + expectedDepth: 1000, + isError: false, + description: "Channel doesn't exist on all nsqd hosts", + }, + { + statusAndResponses: []statusAndResponse{ + {http.StatusOK, `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":100}]}]}`}, + {http.StatusInternalServerError, ""}, + }, + expectedDepth: -1, + isError: true, + description: "At least one host responded with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient} + for _, tc := range testCases { + callCount := atomic.NewInt32(-1) + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + callCount.Inc() + w.WriteHeader(tc.statusAndResponses[callCount.Load()].status) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.statusAndResponses[callCount.Load()].response) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + var nsqdHosts []string + nsqdHost := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + for i := 0; i < len(tc.statusAndResponses); i++ { + nsqdHosts = append(nsqdHosts, nsqdHost) + } + + depth, err := s.aggregateDepth(nsqdHosts, "topic", "channel") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + assert.Equal(t, tc.expectedDepth, depth, tc.description) + } +} + +func TestNSQGetStats(t *testing.T) { + type testCase struct { + serverStatus int + serverResponse string + isError bool + description string + } + testCases := []testCase{ + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":null}`, + isError: false, + description: "Topic does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":[{"topic_name":"topic", "depth":250, "channels":[]}]}`, + isError: false, + description: "Channel does not exist", + }, + { + serverStatus: http.StatusOK, + serverResponse: `{"topics":[{"topic_name":"topic", "depth":250, "channels":[{"channel_name":"channel", "depth":250}]}]}`, + isError: false, + description: "Topic and channel exist", + }, + { + serverStatus: http.StatusInternalServerError, + isError: true, + description: "Host responds with error", + }, + } + + s := nsqScaler{httpClient: http.DefaultClient} + for _, tc := range testCases { + mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tc.serverStatus) + // nosemgrep: no-fprintf-to-responsewriter + fmt.Fprint(w, tc.serverResponse) + })) + defer mockServer.Close() + + parsedURL, err := url.Parse(mockServer.URL) + assert.Nil(t, err) + + host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) + resp, err := s.getStats(host, "topic") + + if err != nil && tc.isError { + continue + } + + assert.Nil(t, err, tc.description) + assert.NotNil(t, resp, tc.description) + } +} diff --git a/pkg/scaling/scalers_builder.go b/pkg/scaling/scalers_builder.go index 1f4549c7ffa..31fac07949b 100644 --- a/pkg/scaling/scalers_builder.go +++ b/pkg/scaling/scalers_builder.go @@ -215,6 +215,8 @@ func buildScaler(ctx context.Context, client client.Client, triggerType string, return scalers.NewNATSJetStreamScaler(config) case "new-relic": return scalers.NewNewRelicScaler(config) + case "nsq": + return scalers.NewNSQScaler(config) case "openstack-metric": return scalers.NewOpenstackMetricScaler(ctx, config) case "openstack-swift": diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go new file mode 100644 index 00000000000..7ef808a33e9 --- /dev/null +++ b/tests/scalers/nsq/nsq_test.go @@ -0,0 +1,214 @@ +//go:build e2e +// +build e2e + +package nsq_test + +import ( + "fmt" + "testing" + + "github.com/joho/godotenv" + "github.com/stretchr/testify/require" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +var _ = godotenv.Load("../../.env") + +const ( + testName = "nsq-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) + jobName = fmt.Sprintf("%s-producer-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + nsqNamespace = "nsq" + nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" + minReplicas = 1 + maxReplicas = 10 + topicName = "test_topic" + channelName = "test_channel" +) + +const ( + deploymentTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.DeploymentName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + selector: + matchLabels: + app: {{.DeploymentName}} + template: + metadata: + labels: + app: {{.DeploymentName}} + spec: + containers: + - image: ghcr.io/kedacore/tests-nsq:latest + name: {{.DeploymentName}} + args: + - "--mode=consumer" + - "--topic={{.TopicName}}" + - "--channel={{.ChannelName}}" + - "--nsqlookupd-http-address=nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" + imagePullPolicy: Always +` + + scaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 10 + idleReplicaCount: 0 + maxReplicaCount: {{.MaxReplicas}} + minReplicaCount: {{.MinReplicas}} + scaleTargetRef: + apiVersion: "apps/v1" + kind: "Deployment" + name: {{.DeploymentName}} + triggers: + - type: nsq + metricType: "AverageValue" + metadata: + nsqLookupdHTTPAddresses: "nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" + topic: "{{.TopicName}}" + channel: "{{.ChannelName}}" + depthThreshold: "10" + activationDepthThreshold: "5" +` + + jobTemplate = ` +apiVersion: batch/v1 +kind: Job +metadata: + name: {{.JobName}} + namespace: {{.TestNamespace}} +spec: + template: + spec: + containers: + - image: docker.io/ulminator/tests-nsq:latest + name: {{.JobName}} + args: + - "--mode=producer" + - "--topic={{.TopicName}}" + - "--nsqd-tcp-address=nsq-nsqd.{{.NSQNamespace}}.svc.cluster.local:4150" + - "--message-count={{.MessageCount}}" + imagePullPolicy: Always + restartPolicy: Never +` +) + +type templateData struct { + TestNamespace string + NSQNamespace string + DeploymentName string + ScaledObjectName string + JobName string + MinReplicas int + MaxReplicas int + TopicName string + ChannelName string + MessageCount int +} + +func TestNSQScaler(t *testing.T) { + kc := GetKubernetesClient(t) + + t.Cleanup(func() { + data, templates := getTemplateData() + uninstallNSQ(t) + KubectlDeleteWithTemplate(t, data, "jobTemplate", jobTemplate) + DeleteKubernetesResources(t, testNamespace, data, templates) + }) + + installNSQ(t, kc) + + data, templates := getTemplateData() + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "Replica count should start out as 0") + + testActivation(t, kc, data) + testScaleOut(t, kc, data) + testScaleIn(t, kc) +} + +func installNSQ(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- installing NSQ ---") + CreateNamespace(t, kc, nsqNamespace) + + _, err := ExecuteCommand("which helm") + require.NoErrorf(t, err, "nsq test requires helm - %s", err) + + _, err = ExecuteCommand(fmt.Sprintf("helm repo add nsqio %s", nsqHelmRepoURL)) + require.NoErrorf(t, err, "error while adding nsqio helm repo - %s", err) + + _, err = ExecuteCommand(fmt.Sprintf("helm install nsq nsqio/nsq --namespace %s --set nsqadmin.enabled=false --wait", nsqNamespace)) + require.NoErrorf(t, err, "error while installing nsq - %s", err) +} + +func uninstallNSQ(t *testing.T) { + t.Log("--- uninstalling NSQ ---") + _, err := ExecuteCommand(fmt.Sprintf("helm uninstall nsq --namespace %s", nsqNamespace)) + require.NoErrorf(t, err, "error while uninstalling nsq - %s", err) + DeleteNamespace(t, nsqNamespace) +} + +func getTemplateData() (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + NSQNamespace: nsqNamespace, + DeploymentName: deploymentName, + JobName: jobName, + ScaledObjectName: scaledObjectName, + MinReplicas: minReplicas, + MaxReplicas: maxReplicas, + TopicName: topicName, + ChannelName: channelName, + }, []Template{ + {Name: "deploymentTemplate", Config: deploymentTemplate}, + {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + } +} + +func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing activation ---") + + data.MessageCount = 5 + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) +} + +func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing scale out ---") + + data.MessageCount = 1 // 5 already published + 1 > activationDepthThreshold + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), + "replica count should be 1 after 1 minute") +} + +func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { + t.Log("--- testing scale in ---") + + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + "replica count should be 0 after 1 minute") +} From 64f31ceae3840970fac4db23d398cc4f514ab467 Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Tue, 15 Oct 2024 09:54:55 -0700 Subject: [PATCH 2/8] fix changelog and image ref Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- CHANGELOG.md | 3 +-- tests/scalers/nsq/nsq_test.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af74a7d80b3..a71233d1d6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -55,14 +55,13 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ## Unreleased -- **General:** Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) - ### New - **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524)) +- **General:** Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) #### Experimental diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index 7ef808a33e9..a75acc99f78 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -101,7 +101,7 @@ spec: template: spec: containers: - - image: docker.io/ulminator/tests-nsq:latest + - image: ghcr.io/kedacore/tests-nsq:latest name: {{.JobName}} args: - "--mode=producer" From 87b8815b97812378bcd63ae4b3aae51a4e2c523d Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Thu, 17 Oct 2024 11:09:03 -0700 Subject: [PATCH 3/8] fix changelog formatting Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a71233d1d6a..2c242db117d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,10 +58,10 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) +- **General**: Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523)) - **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524)) -- **General:** Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) #### Experimental From 54afb54ba4cd74de0a549de91bc10b56e6078555 Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Mon, 4 Nov 2024 17:14:44 -0800 Subject: [PATCH 4/8] fix: address comments Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- pkg/scalers/nsq_scaler.go | 18 ++----- tests/scalers/nsq/nsq_test.go | 89 ++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 52 deletions(-) diff --git a/pkg/scalers/nsq_scaler.go b/pkg/scalers/nsq_scaler.go index 7a99d95baa8..ab5ccb015fe 100644 --- a/pkg/scalers/nsq_scaler.go +++ b/pkg/scalers/nsq_scaler.go @@ -66,14 +66,6 @@ func (m nsqMetadata) Validate() error { return fmt.Errorf("no nsqLookupdHTTPAddresses given") } - if m.Topic == "" { - return fmt.Errorf("no topic given") - } - - if m.Channel == "" { - return fmt.Errorf("no channel given") - } - if m.DepthThreshold <= 0 { return fmt.Errorf("depthThreshold must be a positive integer") } @@ -101,7 +93,7 @@ func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ( return []external_metrics.ExternalMetricValue{}, false, err } - s.logger.Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) + s.logger.V(1).Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth) metric := GenerateMetricInMili(metricName, float64(depth)) @@ -115,7 +107,7 @@ func (s nsqScaler) getTopicChannelDepth() (int64, error) { } if len(nsqdHosts) == 0 { - s.logger.Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) + s.logger.V(1).Info("no nsqd hosts found for topic", "topic", s.metadata.Topic) return 0, nil } @@ -287,7 +279,7 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str if len(t.Channels) == 0 { // topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap - s.logger.Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host) depth += t.Depth continue } @@ -301,14 +293,14 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str if ch.Paused { // if it's paused on a single nsqd host, it's depth should not go into the aggregate // meaning if paused on all nsqd hosts => depth == 0 - s.logger.Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("channel is paused", "topic", topic, "channel", channel, "host", result.host) continue } depth += ch.Depth } if !channelExists { // topic exists with channels, but not the one in question - fallback to topic depth - s.logger.Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) + s.logger.V(1).Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host) depth += t.Depth } } diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index a75acc99f78..48b7a60b282 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -21,16 +21,18 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) - jobName = fmt.Sprintf("%s-producer-job", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - nsqNamespace = "nsq" - nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" - minReplicas = 1 - maxReplicas = 10 - topicName = "test_topic" - channelName = "test_channel" + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-consumer-deployment", testName) + jobName = fmt.Sprintf("%s-producer-job", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + nsqNamespace = "nsq" + nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" + minReplicaCount = 0 + maxReplicaCount = 2 + depthThreshold = 10 + activationDepthThreshold = 5 + topicName = "test_topic" + channelName = "test_channel" ) const ( @@ -58,6 +60,7 @@ spec: - "--mode=consumer" - "--topic={{.TopicName}}" - "--channel={{.ChannelName}}" + - "--sleep-duration=1s" - "--nsqlookupd-http-address=nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" imagePullPolicy: Always ` @@ -73,9 +76,8 @@ metadata: spec: pollingInterval: 5 cooldownPeriod: 10 - idleReplicaCount: 0 - maxReplicaCount: {{.MaxReplicas}} - minReplicaCount: {{.MinReplicas}} + maxReplicaCount: {{.MaxReplicaCount}} + minReplicaCount: {{.MinReplicaCount}} scaleTargetRef: apiVersion: "apps/v1" kind: "Deployment" @@ -87,8 +89,8 @@ spec: nsqLookupdHTTPAddresses: "nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161" topic: "{{.TopicName}}" channel: "{{.ChannelName}}" - depthThreshold: "10" - activationDepthThreshold: "5" + depthThreshold: "{{.DepthThreshold}}" + activationDepthThreshold: "{{.ActivationDepthThreshold}}" ` jobTemplate = ` @@ -114,16 +116,18 @@ spec: ) type templateData struct { - TestNamespace string - NSQNamespace string - DeploymentName string - ScaledObjectName string - JobName string - MinReplicas int - MaxReplicas int - TopicName string - ChannelName string - MessageCount int + TestNamespace string + NSQNamespace string + DeploymentName string + ScaledObjectName string + JobName string + MinReplicaCount int + MaxReplicaCount int + DepthThreshold int + ActivationDepthThreshold int + TopicName string + ChannelName string + MessageCount int } func TestNSQScaler(t *testing.T) { @@ -172,15 +176,17 @@ func uninstallNSQ(t *testing.T) { func getTemplateData() (templateData, []Template) { return templateData{ - TestNamespace: testNamespace, - NSQNamespace: nsqNamespace, - DeploymentName: deploymentName, - JobName: jobName, - ScaledObjectName: scaledObjectName, - MinReplicas: minReplicas, - MaxReplicas: maxReplicas, - TopicName: topicName, - ChannelName: channelName, + TestNamespace: testNamespace, + NSQNamespace: nsqNamespace, + DeploymentName: deploymentName, + JobName: jobName, + ScaledObjectName: scaledObjectName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + DepthThreshold: depthThreshold, + ActivationDepthThreshold: activationDepthThreshold, + TopicName: topicName, + ChannelName: channelName, }, []Template{ {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, @@ -190,20 +196,25 @@ func getTemplateData() (templateData, []Template) { func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing activation ---") - data.MessageCount = 5 + data.MessageCount = activationDepthThreshold KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) - AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + + data.MessageCount = 1 // total message count > activationDepthThreshold + KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), + "replica count should reach 1 in under 1 minute") } func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing scale out ---") - data.MessageCount = 1 // 5 already published + 1 > activationDepthThreshold + // can handle depthThreshold messages per replica - using maxReplicaCount + 1 to ensure scaling to maxReplicaCount + data.MessageCount = depthThreshold * (maxReplicaCount + 1) KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) - require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), - "replica count should be 1 after 1 minute") + require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1), + "replica count should reach 2 in under 1 minute") } func testScaleIn(t *testing.T, kc *kubernetes.Clientset) { From 56787db1f1050f2eb5bf5188bf4fed28bc8e21d3 Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Mon, 4 Nov 2024 20:33:31 -0800 Subject: [PATCH 5/8] feat: add useHttps/unsafeSsl to nsqScaler Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- pkg/scalers/nsq_scaler.go | 47 ++++++++++++++++++++++++---------- pkg/scalers/nsq_scaler_test.go | 32 +++++++++++++---------- tests/scalers/nsq/nsq_test.go | 12 ++++----- 3 files changed, 58 insertions(+), 33 deletions(-) diff --git a/pkg/scalers/nsq_scaler.go b/pkg/scalers/nsq_scaler.go index ab5ccb015fe..651f1947d92 100644 --- a/pkg/scalers/nsq_scaler.go +++ b/pkg/scalers/nsq_scaler.go @@ -23,6 +23,7 @@ type nsqScaler struct { metricType v2.MetricTargetType metadata nsqMetadata httpClient *http.Client + scheme string logger logr.Logger } @@ -32,6 +33,8 @@ type nsqMetadata struct { Channel string `keda:"name=channel, order=triggerMetadata;resolvedEnv"` DepthThreshold int64 `keda:"name=depthThreshold, order=triggerMetadata;resolvedEnv, default=10"` ActivationDepthThreshold int64 `keda:"name=activationDepthThreshold, order=triggerMetadata;resolvedEnv, default=0"` + UseHTTPS bool `keda:"name=useHttps, order=triggerMetadata;resolvedEnv, default=false"` + UnsafeSSL bool `keda:"name=unsafeSsl, order=triggerMetadata;resolvedEnv, default=false"` triggerIndex int } @@ -53,10 +56,16 @@ func NewNSQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) { return nil, fmt.Errorf("error parsing NSQ metadata: %w", err) } + scheme := "http" + if nsqMetadata.UseHTTPS { + scheme = "https" + } + return &nsqScaler{ metricType: metricType, metadata: nsqMetadata, - httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, true), + httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, nsqMetadata.UnsafeSSL), + scheme: scheme, logger: logger, }, nil } @@ -86,8 +95,8 @@ func parseNSQMetadata(config *scalersconfig.ScalerConfig) (nsqMetadata, error) { return meta, nil } -func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { - depth, err := s.getTopicChannelDepth() +func (s nsqScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) { + depth, err := s.getTopicChannelDepth(ctx) if err != nil { return []external_metrics.ExternalMetricValue{}, false, err @@ -100,8 +109,8 @@ func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ( return []external_metrics.ExternalMetricValue{metric}, depth > s.metadata.ActivationDepthThreshold, nil } -func (s nsqScaler) getTopicChannelDepth() (int64, error) { - nsqdHosts, err := s.getTopicProducers(s.metadata.Topic) +func (s nsqScaler) getTopicChannelDepth(ctx context.Context) (int64, error) { + nsqdHosts, err := s.getTopicProducers(ctx, s.metadata.Topic) if err != nil { return -1, fmt.Errorf("error getting nsqd hosts: %w", err) } @@ -111,7 +120,7 @@ func (s nsqScaler) getTopicChannelDepth() (int64, error) { return 0, nil } - depth, err := s.aggregateDepth(nsqdHosts, s.metadata.Topic, s.metadata.Channel) + depth, err := s.aggregateDepth(ctx, nsqdHosts, s.metadata.Topic, s.metadata.Channel) if err != nil { return -1, fmt.Errorf("error getting topic/channel depth: %w", err) } @@ -152,7 +161,7 @@ type lookupResult struct { err error } -func (s *nsqScaler) getTopicProducers(topic string) ([]string, error) { +func (s *nsqScaler) getTopicProducers(ctx context.Context, topic string) ([]string, error) { var wg sync.WaitGroup resultCh := make(chan lookupResult, len(s.metadata.NSQLookupdHTTPAddresses)) @@ -160,7 +169,7 @@ func (s *nsqScaler) getTopicProducers(topic string) ([]string, error) { wg.Add(1) go func(host string, topic string) { defer wg.Done() - resp, err := s.getLookup(host, topic) + resp, err := s.getLookup(ctx, host, topic) resultCh <- lookupResult{host, resp, err} }(host, topic) } @@ -193,8 +202,13 @@ func (s *nsqScaler) getTopicProducers(topic string) ([]string, error) { return nsqdHosts, nil } -func (s *nsqScaler) getLookup(host string, topic string) (*lookupResponse, error) { - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "lookup"), nil) +func (s *nsqScaler) getLookup(ctx context.Context, host string, topic string) (*lookupResponse, error) { + lookupURL := url.URL{ + Scheme: s.scheme, + Host: host, + Path: "lookup", + } + req, err := http.NewRequestWithContext(ctx, "GET", lookupURL.String(), nil) if err != nil { return nil, err } @@ -249,7 +263,7 @@ type statsResult struct { err error } -func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel string) (int64, error) { +func (s *nsqScaler) aggregateDepth(ctx context.Context, nsqdHosts []string, topic string, channel string) (int64, error) { wg := sync.WaitGroup{} resultCh := make(chan statsResult, len(nsqdHosts)) @@ -257,7 +271,7 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str wg.Add(1) go func(host string, topic string) { defer wg.Done() - resp, err := s.getStats(host, topic) + resp, err := s.getStats(ctx, host, topic) resultCh <- statsResult{host, resp, err} }(host, topic) } @@ -309,8 +323,13 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str return depth, nil } -func (s *nsqScaler) getStats(host string, topic string) (*statsResponse, error) { - req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "stats"), nil) +func (s *nsqScaler) getStats(ctx context.Context, host string, topic string) (*statsResponse, error) { + statsURL := url.URL{ + Scheme: s.scheme, + Host: host, + Path: "stats", + } + req, err := http.NewRequestWithContext(ctx, "GET", statsURL.String(), nil) if err != nil { return nil, err } diff --git a/pkg/scalers/nsq_scaler_test.go b/pkg/scalers/nsq_scaler_test.go index d97d167c44c..956f9f1621e 100644 --- a/pkg/scalers/nsq_scaler_test.go +++ b/pkg/scalers/nsq_scaler_test.go @@ -25,6 +25,8 @@ type nsqMetadataTestData struct { channel string depthThreshold int64 activationDepthThreshold int64 + useHTTPS bool + unsafeSsl bool isError bool description string } @@ -60,13 +62,15 @@ var parseNSQMetadataTestDataset = []nsqMetadataTestData{ description: "Success, multiple nsqlookupd addresses", }, { - metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "100", "activationDepthThreshold": "1"}, + metadata: map[string]string{"nsqLookupdHTTPAddresses": "nsqlookupd-0:4161", "topic": "topic", "channel": "channel", "depthThreshold": "100", "activationDepthThreshold": "1", "useHttps": "true", "unsafeSsl": "true"}, numNSQLookupdHTTPAddresses: 1, nsqLookupdHTTPAddresses: []string{"nsqlookupd-0:4161"}, topic: "topic", channel: "channel", depthThreshold: 100, activationDepthThreshold: 1, + useHTTPS: true, + unsafeSsl: true, isError: false, description: "Success - setting optional fields", }, @@ -123,6 +127,8 @@ func TestNSQParseMetadata(t *testing.T) { assert.Equal(t, testData.channel, meta.Channel, testData.description) assert.Equal(t, testData.depthThreshold, meta.DepthThreshold, testData.description) assert.Equal(t, testData.activationDepthThreshold, meta.ActivationDepthThreshold, testData.description) + assert.Equal(t, testData.useHTTPS, meta.UseHTTPS, testData.description) + assert.Equal(t, testData.unsafeSsl, meta.UnsafeSSL, testData.description) } } @@ -187,7 +193,7 @@ func TestNSQGetMetricsAndActivity(t *testing.T) { meta, err := parseNSQMetadata(&config) assert.Nil(t, err) - s := nsqScaler{v2.AverageValueMetricType, meta, http.DefaultClient, logr.Discard()} + s := nsqScaler{v2.AverageValueMetricType, meta, http.DefaultClient, "http", logr.Discard()} metricName := "s0-nsq-topic-channel" metrics, activity, err := s.GetMetricsAndActivity(context.Background(), metricName) @@ -218,7 +224,7 @@ func TestNSQGetMetricSpecForScaling(t *testing.T) { } metricType := v2.MetricTargetType(testData.metricType) - mockNSQScaler := nsqScaler{metricType, meta, nil, logr.Discard()} + mockNSQScaler := nsqScaler{metricType, meta, nil, "http", logr.Discard()} metricSpec := mockNSQScaler.GetMetricSpecForScaling(context.Background()) metricName := metricSpec[0].External.Metric.Name @@ -321,9 +327,9 @@ func TestNSQGetTopicChannelDepth(t *testing.T) { nsqLookupdHosts := []string{net.JoinHostPort(parsedNSQLookupdURL.Hostname(), parsedNSQLookupdURL.Port())} - s := nsqScaler{httpClient: http.DefaultClient, metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts, Topic: "topic", Channel: "channel"}} + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http", metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts, Topic: "topic", Channel: "channel"}} - depth, err := s.getTopicChannelDepth() + depth, err := s.getTopicChannelDepth(context.Background()) if err != nil && (tc.lookupError || tc.statsError) { continue @@ -405,9 +411,9 @@ func TestNSQGetTopicProducers(t *testing.T) { nsqLookupdHosts = append(nsqLookupdHosts, nsqLookupdHost) } - s := nsqScaler{httpClient: http.DefaultClient, metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts}} + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http", metadata: nsqMetadata{NSQLookupdHTTPAddresses: nsqLookupdHosts}} - nsqdHosts, err := s.getTopicProducers("topic") + nsqdHosts, err := s.getTopicProducers(context.Background(), "topic") if err != nil && tc.isError { continue @@ -457,7 +463,7 @@ func TestNSQGetLookup(t *testing.T) { }, } - s := nsqScaler{httpClient: http.DefaultClient} + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} for _, tc := range testCases { mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(tc.serverStatus) @@ -471,7 +477,7 @@ func TestNSQGetLookup(t *testing.T) { host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) - resp, err := s.getLookup(host, "topic") + resp, err := s.getLookup(context.Background(), host, "topic") if err != nil && tc.isError { continue @@ -569,7 +575,7 @@ func TestNSQAggregateDepth(t *testing.T) { }, } - s := nsqScaler{httpClient: http.DefaultClient} + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} for _, tc := range testCases { callCount := atomic.NewInt32(-1) mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -589,7 +595,7 @@ func TestNSQAggregateDepth(t *testing.T) { nsqdHosts = append(nsqdHosts, nsqdHost) } - depth, err := s.aggregateDepth(nsqdHosts, "topic", "channel") + depth, err := s.aggregateDepth(context.Background(), nsqdHosts, "topic", "channel") if err != nil && tc.isError { continue @@ -633,7 +639,7 @@ func TestNSQGetStats(t *testing.T) { }, } - s := nsqScaler{httpClient: http.DefaultClient} + s := nsqScaler{httpClient: http.DefaultClient, scheme: "http"} for _, tc := range testCases { mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(tc.serverStatus) @@ -646,7 +652,7 @@ func TestNSQGetStats(t *testing.T) { assert.Nil(t, err) host := net.JoinHostPort(parsedURL.Hostname(), parsedURL.Port()) - resp, err := s.getStats(host, "topic") + resp, err := s.getStats(context.Background(), host, "topic") if err != nil && tc.isError { continue diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index 48b7a60b282..85028c9a9ed 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -29,10 +29,10 @@ var ( nsqHelmRepoURL = "https://nsqio.github.io/helm-chart" minReplicaCount = 0 maxReplicaCount = 2 - depthThreshold = 10 - activationDepthThreshold = 5 topicName = "test_topic" channelName = "test_channel" + depthThreshold = 10 + activationDepthThreshold = 5 ) const ( @@ -123,10 +123,10 @@ type templateData struct { JobName string MinReplicaCount int MaxReplicaCount int - DepthThreshold int - ActivationDepthThreshold int TopicName string ChannelName string + DepthThreshold int + ActivationDepthThreshold int MessageCount int } @@ -183,10 +183,10 @@ func getTemplateData() (templateData, []Template) { ScaledObjectName: scaledObjectName, MinReplicaCount: minReplicaCount, MaxReplicaCount: maxReplicaCount, - DepthThreshold: depthThreshold, - ActivationDepthThreshold: activationDepthThreshold, TopicName: topicName, ChannelName: channelName, + DepthThreshold: depthThreshold, + ActivationDepthThreshold: activationDepthThreshold, }, []Template{ {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, From 5ebd84acd859aaacfe611fd2a303eac5e92657ea Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:02:50 -0800 Subject: [PATCH 6/8] fix: make nsq e2e tests more reliable Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- tests/scalers/nsq/nsq_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index 85028c9a9ed..ca556ecf5e1 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -31,7 +31,7 @@ var ( maxReplicaCount = 2 topicName = "test_topic" channelName = "test_channel" - depthThreshold = 10 + depthThreshold = 1 activationDepthThreshold = 5 ) @@ -145,7 +145,7 @@ func TestNSQScaler(t *testing.T) { data, templates := getTemplateData() CreateKubernetesResources(t, kc, testNamespace, data, templates) - require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 0, 60, 1), + require.True(t, WaitForPodsTerminated(t, kc, fmt.Sprintf("app=%s", deploymentName), testNamespace, 60, 1), "Replica count should start out as 0") testActivation(t, kc, data) @@ -163,7 +163,7 @@ func installNSQ(t *testing.T, kc *kubernetes.Clientset) { _, err = ExecuteCommand(fmt.Sprintf("helm repo add nsqio %s", nsqHelmRepoURL)) require.NoErrorf(t, err, "error while adding nsqio helm repo - %s", err) - _, err = ExecuteCommand(fmt.Sprintf("helm install nsq nsqio/nsq --namespace %s --set nsqadmin.enabled=false --wait", nsqNamespace)) + _, err = ExecuteCommand(fmt.Sprintf("helm install nsq nsqio/nsq --namespace %s --set nsqd.replicaCount=1 --set nsqlookupd.replicaCount=1 --set nsqadmin.enabled=false --wait", nsqNamespace)) require.NoErrorf(t, err, "error while installing nsq - %s", err) } @@ -198,7 +198,7 @@ func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { data.MessageCount = activationDepthThreshold KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) - AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 20) data.MessageCount = 1 // total message count > activationDepthThreshold KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) @@ -209,8 +209,7 @@ func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) { t.Log("--- testing scale out ---") - // can handle depthThreshold messages per replica - using maxReplicaCount + 1 to ensure scaling to maxReplicaCount - data.MessageCount = depthThreshold * (maxReplicaCount + 1) + data.MessageCount = 80 KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1), From 4a6a4a72f678fe6bfc5181c823defc3a8b2b7e4b Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Tue, 5 Nov 2024 13:48:02 -0800 Subject: [PATCH 7/8] Update tests/scalers/nsq/nsq_test.go Co-authored-by: Jorge Turrado Ferrero Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- tests/scalers/nsq/nsq_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scalers/nsq/nsq_test.go b/tests/scalers/nsq/nsq_test.go index ca556ecf5e1..cb81c506bdd 100644 --- a/tests/scalers/nsq/nsq_test.go +++ b/tests/scalers/nsq/nsq_test.go @@ -200,7 +200,7 @@ func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) { KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 20) - data.MessageCount = 1 // total message count > activationDepthThreshold + data.MessageCount = 1 KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate) require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1), "replica count should reach 1 in under 1 minute") From b0d2bb5edc96bfe8e1bb5fc2680cc30d03c2412c Mon Sep 17 00:00:00 2001 From: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> Date: Thu, 14 Nov 2024 16:30:40 -0800 Subject: [PATCH 8/8] fix: update changelog Signed-off-by: Matt Ulmer <25484774+Ulminator@users.noreply.github.com> --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f363c1e5f24..cdc424eafe5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,7 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **General**: Introduce new NSQ scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) #### Experimental @@ -98,7 +98,6 @@ New deprecation(s): - **General**: Add the generateEmbeddedObjectMeta flag to generate meta properties of JobTargetRef in ScaledJob ([#5908](https://github.com/kedacore/keda/issues/5908)) - **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973)) - **General**: Introduce new Beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901)) -- **General**: Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281)) - **General**: Replace wildcards in RBAC objects with explicit resources and verbs ([#6129](https://github.com/kedacore/keda/pull/6129)) - **Azure Pipelines Scalar**: Print warning to log when Azure DevOps API Rate Limits are (nearly) reached ([#6284](https://github.com/kedacore/keda/issues/6284)) - **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))