Skip to content

Commit

Permalink
Configuration options for hardcoded values (#111)
Browse files Browse the repository at this point in the history
* Add `MESSENGER_ERROR_MAX_BACKOFF` config env var
* Add `AUTOSCALING_TIME_WINDOW` config env var
* Add `AUTOSCALING_INTERVAL` config env var

For time window calc see: https://go.dev/play/p/b2GdRzMJPUb
  • Loading branch information
nstogner authored Jul 6, 2024
1 parent 55f1122 commit 1d8de27
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
12 changes: 9 additions & 3 deletions cmd/lingo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,11 @@ func run() error {
// Rabbit MQ: "rabbit://myqueue|rabbit://myexchange"
// NATS: "nats://example.mysubject1|nats://example.mysubject2"
// Kafka: "kafka://my-group?topic=my-topic1|kafka://my-topic2"
MessengerURLs []string `env:"MESSENGER_URLS"`
MessengerURLs []string `env:"MESSENGER_URLS"`
MessengerErrorMaxBackoff time.Duration `env:"MESSENGER_ERROR_MAX_BACKOFF, default=3m"`

AutoscalingInterval time.Duration `env:"AUTOSCALING_INTERVAL, default=3s"`
AutoscalingTimeWindow time.Duration `env:"AUTOSCALING_TIME_WINDOW, default=30s"`

MetricsBindAddress string `env:"METRICS_BIND_ADDRESS, default=:8082"`
HealthProbeBindAddress string `env:"HEALTH_PROBE_BIND_ADDRESS, default=:8081"`
Expand Down Expand Up @@ -184,8 +188,9 @@ func run() error {
if err != nil {
return fmt.Errorf("setting up autoscaler: %w", err)
}
autoscaler.Interval = 3 * time.Second
autoscaler.AverageCount = 10 // 10 * 3 seconds = 30 sec avg
autoscaler.Interval = cfg.AutoscalingInterval
// 10 average count = 30 sec window / 3 sec interval
autoscaler.AverageCount = int(cfg.AutoscalingTimeWindow / cfg.AutoscalingInterval)
autoscaler.LeaderElection = le
autoscaler.Deployments = deploymentManager
autoscaler.ConcurrencyPerReplica = cfg.Concurrency
Expand Down Expand Up @@ -213,6 +218,7 @@ func run() error {
msgURL.requests,
msgURL.responses,
msgURL.maxHandlers,
cfg.MessengerErrorMaxBackoff,
deploymentManager,
endpointManager,
queueManager,
Expand Down
28 changes: 15 additions & 13 deletions pkg/messenger/messager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type Messenger struct {

HTTPC *http.Client

MaxHandlers int
MaxHandlers int
ErrorMaxBackoff time.Duration

requests *pubsub.Subscription
responses *pubsub.Topic
Expand All @@ -42,6 +43,7 @@ func NewMessenger(
requestsURL string,
responsesURL string,
maxHandlers int,
errorMaxBackoff time.Duration,
deployments DeploymentManager,
endpoints EndpointManager,
queues QueueManager,
Expand All @@ -58,13 +60,14 @@ func NewMessenger(
}

return &Messenger{
Deployments: deployments,
Endpoints: endpoints,
Queues: queues,
HTTPC: httpClient,
requests: requests,
responses: responses,
MaxHandlers: maxHandlers,
Deployments: deployments,
Endpoints: endpoints,
Queues: queues,
HTTPC: httpClient,
requests: requests,
responses: responses,
MaxHandlers: maxHandlers,
ErrorMaxBackoff: errorMaxBackoff,
}, nil
}

Expand Down Expand Up @@ -103,7 +106,7 @@ recvLoop:
// * Some request-generation job sending a million malformed requests into a topic.
// (Slow until an admin can intervene)
if consecutiveErrors := m.getConsecutiveErrors(); consecutiveErrors > 0 {
wait := consecutiveErrBackoff(consecutiveErrors)
wait := consecutiveErrBackoff(consecutiveErrors, m.ErrorMaxBackoff)
log.Printf("after %d consecutive errors, waiting %v before processing next message", consecutiveErrors, wait)
time.Sleep(wait)
}
Expand All @@ -118,11 +121,10 @@ recvLoop:
return nil
}

func consecutiveErrBackoff(n int) time.Duration {
func consecutiveErrBackoff(n int, max time.Duration) time.Duration {
d := time.Duration(n) * time.Second
const maxBackoff = 3 * time.Minute
if d > maxBackoff {
return maxBackoff
if d > max {
return max
}
return d
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func TestMain(m *testing.M) {
memRequestsURL,
memResponsesURL,
1000,
time.Minute,
deploymentManager,
endpointManager,
queueManager,
Expand Down

0 comments on commit 1d8de27

Please sign in to comment.