Skip to content

Commit

Permalink
fix: address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Ulmer <[email protected]>
  • Loading branch information
Ulminator committed Nov 5, 2024
1 parent 5d32d68 commit 1dbec8b
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 52 deletions.
18 changes: 5 additions & 13 deletions pkg/scalers/nsq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ func (m nsqMetadata) Validate() error {
return fmt.Errorf("no nsqLookupdHTTPAddresses given")
}

if m.Topic == "" {
return fmt.Errorf("no topic given")
}

if m.Channel == "" {
return fmt.Errorf("no channel given")
}

if m.DepthThreshold <= 0 {
return fmt.Errorf("depthThreshold must be a positive integer")
}
Expand Down Expand Up @@ -101,7 +93,7 @@ func (s nsqScaler) GetMetricsAndActivity(_ context.Context, metricName string) (
return []external_metrics.ExternalMetricValue{}, false, err
}

s.logger.Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth)
s.logger.V(1).Info("GetMetricsAndActivity", "metricName", metricName, "depth", depth)

metric := GenerateMetricInMili(metricName, float64(depth))

Expand All @@ -115,7 +107,7 @@ func (s nsqScaler) getTopicChannelDepth() (int64, error) {
}

if len(nsqdHosts) == 0 {
s.logger.Info("no nsqd hosts found for topic", "topic", s.metadata.Topic)
s.logger.V(1).Info("no nsqd hosts found for topic", "topic", s.metadata.Topic)
return 0, nil
}

Expand Down Expand Up @@ -287,7 +279,7 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str

if len(t.Channels) == 0 {
// topic exists with no channels, but there are messages in the topic -> we should still scale to bootstrap
s.logger.Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host)
s.logger.V(1).Info("no channels exist for topic", "topic", topic, "channel", channel, "host", result.host)
depth += t.Depth
continue
}
Expand All @@ -301,14 +293,14 @@ func (s *nsqScaler) aggregateDepth(nsqdHosts []string, topic string, channel str
if ch.Paused {
// if it's paused on a single nsqd host, it's depth should not go into the aggregate
// meaning if paused on all nsqd hosts => depth == 0
s.logger.Info("channel is paused", "topic", topic, "channel", channel, "host", result.host)
s.logger.V(1).Info("channel is paused", "topic", topic, "channel", channel, "host", result.host)
continue
}
depth += ch.Depth
}
if !channelExists {
// topic exists with channels, but not the one in question - fallback to topic depth
s.logger.Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host)
s.logger.V(1).Info("channel does not exist for topic", "topic", topic, "channel", channel, "host", result.host)
depth += t.Depth
}
}
Expand Down
89 changes: 50 additions & 39 deletions tests/scalers/nsq/nsq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ const (
)

var (
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-consumer-deployment", testName)
jobName = fmt.Sprintf("%s-producer-job", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
nsqNamespace = "nsq"
nsqHelmRepoURL = "https://nsqio.github.io/helm-chart"
minReplicas = 1
maxReplicas = 10
topicName = "test_topic"
channelName = "test_channel"
testNamespace = fmt.Sprintf("%s-ns", testName)
deploymentName = fmt.Sprintf("%s-consumer-deployment", testName)
jobName = fmt.Sprintf("%s-producer-job", testName)
scaledObjectName = fmt.Sprintf("%s-so", testName)
nsqNamespace = "nsq"
nsqHelmRepoURL = "https://nsqio.github.io/helm-chart"
minReplicaCount = 0
maxReplicaCount = 2
depthThreshold = 10
activationDepthThreshold = 5
topicName = "test_topic"
channelName = "test_channel"
)

const (
Expand Down Expand Up @@ -58,6 +60,7 @@ spec:
- "--mode=consumer"
- "--topic={{.TopicName}}"
- "--channel={{.ChannelName}}"
- "--sleep-duration=1s"
- "--nsqlookupd-http-address=nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161"
imagePullPolicy: Always
`
Expand All @@ -73,9 +76,8 @@ metadata:
spec:
pollingInterval: 5
cooldownPeriod: 10
idleReplicaCount: 0
maxReplicaCount: {{.MaxReplicas}}
minReplicaCount: {{.MinReplicas}}
maxReplicaCount: {{.MaxReplicaCount}}
minReplicaCount: {{.MinReplicaCount}}
scaleTargetRef:
apiVersion: "apps/v1"
kind: "Deployment"
Expand All @@ -87,8 +89,8 @@ spec:
nsqLookupdHTTPAddresses: "nsq-nsqlookupd.{{.NSQNamespace}}.svc.cluster.local:4161"
topic: "{{.TopicName}}"
channel: "{{.ChannelName}}"
depthThreshold: "10"
activationDepthThreshold: "5"
depthThreshold: "{{.DepthThreshold}}"
activationDepthThreshold: "{{.ActivationDepthThreshold}}"
`

jobTemplate = `
Expand All @@ -114,16 +116,18 @@ spec:
)

type templateData struct {
TestNamespace string
NSQNamespace string
DeploymentName string
ScaledObjectName string
JobName string
MinReplicas int
MaxReplicas int
TopicName string
ChannelName string
MessageCount int
TestNamespace string
NSQNamespace string
DeploymentName string
ScaledObjectName string
JobName string
MinReplicaCount int
MaxReplicaCount int
DepthThreshold int
ActivationDepthThreshold int
TopicName string
ChannelName string
MessageCount int
}

func TestNSQScaler(t *testing.T) {
Expand Down Expand Up @@ -172,15 +176,17 @@ func uninstallNSQ(t *testing.T) {

func getTemplateData() (templateData, []Template) {
return templateData{
TestNamespace: testNamespace,
NSQNamespace: nsqNamespace,
DeploymentName: deploymentName,
JobName: jobName,
ScaledObjectName: scaledObjectName,
MinReplicas: minReplicas,
MaxReplicas: maxReplicas,
TopicName: topicName,
ChannelName: channelName,
TestNamespace: testNamespace,
NSQNamespace: nsqNamespace,
DeploymentName: deploymentName,
JobName: jobName,
ScaledObjectName: scaledObjectName,
MinReplicaCount: minReplicaCount,
MaxReplicaCount: maxReplicaCount,
TopicName: topicName,
ChannelName: channelName,
DepthThreshold: depthThreshold,
ActivationDepthThreshold: activationDepthThreshold,
}, []Template{
{Name: "deploymentTemplate", Config: deploymentTemplate},
{Name: "scaledObjectTemplate", Config: scaledObjectTemplate},
Expand All @@ -190,20 +196,25 @@ func getTemplateData() (templateData, []Template) {
func testActivation(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing activation ---")

data.MessageCount = 5
data.MessageCount = activationDepthThreshold
KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate)

AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 60)

data.MessageCount = 1 // total message count > activationDepthThreshold
KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate)
require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1),
"replica count should reach 1 in under 1 minute")
}

func testScaleOut(t *testing.T, kc *kubernetes.Clientset, data templateData) {
t.Log("--- testing scale out ---")

data.MessageCount = 1 // 5 already published + 1 > activationDepthThreshold
// can handle depthThreshold messages per replica - using maxReplicaCount + 1 to ensure scaling to maxReplicaCount
data.MessageCount = depthThreshold * (maxReplicaCount + 1)
KubectlReplaceWithTemplate(t, data, "jobTemplate", jobTemplate)

require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 1),
"replica count should be 1 after 1 minute")
require.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, maxReplicaCount, 60, 1),
"replica count should reach 2 in under 1 minute")
}

func testScaleIn(t *testing.T, kc *kubernetes.Clientset) {
Expand Down

0 comments on commit 1dbec8b

Please sign in to comment.