Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add nsq scaler #6230

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973))
- **General**: Introduce new NSQ Scaler ([#3281](https://github.com/kedacore/keda/issues/3281))
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of ScaledJobs resources ([#3523](https://github.com/kedacore/keda/issues/3523))
- **CloudEventSource**: Provide ClusterCloudEventSource around the management of TriggerAuthentication/ClusterTriggerAuthentication resources ([#3524](https://github.com/kedacore/keda/issues/3524))
Expand Down
358 changes: 358 additions & 0 deletions pkg/scalers/nsq_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
package scalers

import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"strconv"
"sync"

"github.com/go-logr/logr"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type nsqScaler struct {
metricType v2.MetricTargetType
metadata nsqMetadata
httpClient *http.Client
logger logr.Logger
}

type nsqMetadata struct {
NSQLookupdHTTPAddresses []string `keda:"name=nsqLookupdHTTPAddresses, order=triggerMetadata;resolvedEnv"`
Topic string `keda:"name=topic, order=triggerMetadata;resolvedEnv"`
Channel string `keda:"name=channel, order=triggerMetadata;resolvedEnv"`
DepthThreshold int64 `keda:"name=depthThreshold, order=triggerMetadata;resolvedEnv, default=10"`
ActivationDepthThreshold int64 `keda:"name=activationDepthThreshold, order=triggerMetadata;resolvedEnv, default=0"`

triggerIndex int
}

const (
nsqMetricType = "External"
)

func NewNSQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
}

logger := InitializeLogger(config, "nsq_scaler")

nsqMetadata, err := parseNSQMetadata(config)
if err != nil {
return nil, fmt.Errorf("error parsing NSQ metadata: %w", err)
}

return &nsqScaler{
metricType: metricType,
metadata: nsqMetadata,
httpClient: kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, true),
wozniakjan marked this conversation as resolved.
Show resolved Hide resolved
logger: logger,
}, nil
}

func (m nsqMetadata) Validate() error {
if len(m.NSQLookupdHTTPAddresses) == 0 {
return fmt.Errorf("no nsqLookupdHTTPAddresses given")
}

if m.Topic == "" {
return fmt.Errorf("no topic given")
}

if m.Channel == "" {
return fmt.Errorf("no channel given")
}
wozniakjan marked this conversation as resolved.
Show resolved Hide resolved

if m.DepthThreshold <= 0 {
return fmt.Errorf("depthThreshold must be a positive integer")
}

if m.ActivationDepthThreshold < 0 {
return fmt.Errorf("activationDepthThreshold must be greater than or equal to 0")
}

return nil
}

func parseNSQMetadata(config *scalersconfig.ScalerConfig) (nsqMetadata, error) {
meta := nsqMetadata{triggerIndex: config.TriggerIndex}
if err := config.TypedConfig(&meta); err != nil {
return meta, fmt.Errorf("error parsing nsq metadata: %w", err)
}

return meta, nil
}

func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
depth, err := s.getTopicChannelDepth()

if err != nil {
return []external_metrics.ExternalMetricValue{}, false, err
}

s.logger.Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth)
Ulminator marked this conversation as resolved.
Show resolved Hide resolved

metric := GenerateMetricInMili(metricName, float64(depth))

return []external_metrics.ExternalMetricValue{metric}, depth > s.metadata.ActivationDepthThreshold, nil
}

func (s nsqScaler) getTopicChannelDepth() (int64, error) {
nsqdHosts, err := s.getTopicProducers(s.metadata.Topic)
if err != nil {
return -1, fmt.Errorf("error getting nsqd hosts: %w", err)
}

if len(nsqdHosts) == 0 {
s.logger.Info("no nsqd hosts found for topic", "topic", s.metadata.Topic)
Ulminator marked this conversation as resolved.
Show resolved Hide resolved
return 0, nil
}

depth, err := s.aggregateDepth(nsqdHosts, s.metadata.Topic, s.metadata.Channel)
if err != nil {
return -1, fmt.Errorf("error getting topic/channel depth: %w", err)
}

return depth, nil
}

func (s nsqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := fmt.Sprintf("nsq-%s-%s", s.metadata.Topic, s.metadata.Channel)

externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, kedautil.NormalizeString(metricName)),
},
Target: GetMetricTarget(s.metricType, s.metadata.DepthThreshold),
}
metricSpec := v2.MetricSpec{External: externalMetric, Type: nsqMetricType}
return []v2.MetricSpec{metricSpec}
}

func (s nsqScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

type lookupResponse struct {
Producers []struct {
HTTPPort int `json:"http_port"`
BroadcastAddress string `json:"broadcast_address"`
}
}

type lookupResult struct {
host string
lookupResponse *lookupResponse
err error
}

func (s *nsqScaler) getTopicProducers(topic string) ([]string, error) {
var wg sync.WaitGroup
resultCh := make(chan lookupResult, len(s.metadata.NSQLookupdHTTPAddresses))

for _, host := range s.metadata.NSQLookupdHTTPAddresses {
wg.Add(1)
go func(host string, topic string) {
defer wg.Done()
resp, err := s.getLookup(host, topic)
resultCh <- lookupResult{host, resp, err}
}(host, topic)
}

wg.Wait()
close(resultCh)

var nsqdHostMap = make(map[string]bool)
for result := range resultCh {
if result.err != nil {
return nil, fmt.Errorf("error getting lookup from host '%s': %w", result.host, result.err)
}

if result.lookupResponse == nil {
// topic is not found on a single nsqlookupd host, it may exist on another
continue
}

for _, producer := range result.lookupResponse.Producers {
nsqdHost := net.JoinHostPort(producer.BroadcastAddress, strconv.Itoa(producer.HTTPPort))
nsqdHostMap[nsqdHost] = true
}
}

var nsqdHosts []string
for nsqdHost := range nsqdHostMap {
nsqdHosts = append(nsqdHosts, nsqdHost)
}

return nsqdHosts, nil
}

func (s *nsqScaler) getLookup(host string, topic string) (*lookupResponse, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "lookup"), nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", "application/json; charset=utf-8")

params := url.Values{"topic": {topic}}
req.URL.RawQuery = params.Encode()

resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusNotFound {
return nil, nil
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code '%s'", resp.Status)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var lookupResponse lookupResponse
err = json.Unmarshal(body, &lookupResponse)
if err != nil {
return nil, err
}

return &lookupResponse, nil
}

type statsResponse struct {
Topics []struct {
TopicName string `json:"topic_name"`
Depth int64 `json:"depth"`
Channels []struct {
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"` // num messages in the queue (mem + disk)
Paused bool `json:"paused"` // if paused, consumers will not receive messages
}
}
}

type statsResult struct {
host string
statsResponse *statsResponse
err error
}

func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel string) (int64, error) {
wg := sync.WaitGroup{}
resultCh := make(chan statsResult, len(nsqdHosts))

for _, host := range nsqdHosts {
wg.Add(1)
go func(host string, topic string) {
defer wg.Done()
resp, err := s.getStats(host, topic)
resultCh <- statsResult{host, resp, err}
}(host, topic)
}

wg.Wait()
close(resultCh)

var depth int64
for result := range resultCh {
if result.err != nil {
return -1, fmt.Errorf("error getting stats from host '%s': %w", result.host, result.err)
}

for _, t := range result.statsResponse.Topics {
if t.TopicName != topic {
// this should never happen as we make the /stats call with the "topic" param
continue
}

if len(t.Channels) == 0 {
// topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap
s.logger.Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host)
depth += t.Depth
continue
}

channelExists := false
for _, ch := range t.Channels {
if ch.ChannelName != channel {
continue
}
channelExists = true
if ch.Paused {
// if it's paused on a single nsqd host, it's depth should not go into the aggregate
// meaning if paused on all nsqd hosts => depth == 0
s.logger.Info("channel is paused", "topic", topic, "channel", channel, "host", result.host)
Ulminator marked this conversation as resolved.
Show resolved Hide resolved
continue
}
depth += ch.Depth
}
if !channelExists {
// topic exists with channels, but not the one in question - fallback to topic depth
s.logger.Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host)
Ulminator marked this conversation as resolved.
Show resolved Hide resolved
depth += t.Depth
}
}
}

return depth, nil
}

func (s *nsqScaler) getStats(host string, topic string) (*statsResponse, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/%s", host, "stats"), nil)
if err != nil {
return nil, err
}

// "channel" is a query param as well, but if used and the channel does not exist
// we do not receive any stats for the existing topic
params := url.Values{
"format": {"json"},
"include_clients": {"false"},
"include_mem": {"false"},
"topic": {topic},
}
req.URL.RawQuery = params.Encode()

resp, err := s.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status code '%s'", resp.Status)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

var statsResponse statsResponse
err = json.Unmarshal(body, &statsResponse)
if err != nil {
return nil, err
}

return &statsResponse, nil
}
Loading
Loading