Skip to content

Commit 8ff57b5

Browse files
committed
feat(exporter/loadbalancing/dnsresolver): add quarantine mechanism for unhealthy endpoints
- Added a quarantine feature for unhealthy endpoints, delaying retries to those endpoints after a configurable period (default: 30s). - Quarantine settings are configurable via the DNS resolver's `quarantine` section. - The load balancer will avoid sending data to endpoints marked as unhealthy until their quarantine period expires, using healthy endpoints in the hash ring without triggering unnecessary ring updates. - This increases resilience by reducing the risk of exporters being stuck in degraded states with repeated failed attempts. - This feature currently applies only to the DNS resolver. Refs #43644
1 parent 5aa2746 commit 8ff57b5

15 files changed

+870
-66
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/loadbalancing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Quarantine mechanism for unhealthy endpoints in the DNS resolver.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43644]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
- Added a quarantine feature for unhealthy endpoints, delaying retries to those endpoints after a configurable period (default: 30s).
20+
- Quarantine settings are configurable via the DNS resolver's `quarantine` section.
21+
- The load balancer will avoid sending data to endpoints marked as unhealthy until their quarantine period expires, using healthy endpoints in the hash ring without triggering unnecessary ring updates.
22+
- This increases resilience by reducing the risk of exporters being stuck in degraded states with repeated failed attempts.
23+
- This feature currently applies only to the DNS resolver.
24+
25+
# If your change doesn't affect end users or the exported elements of any package,
26+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
27+
# Optional: The change log or logs in which this entry should be included.
28+
# e.g. '[user]' or '[user, api]'
29+
# Include 'user' if the change is relevant to end users.
30+
# Include 'api' if there is a change to a library API.
31+
# Default: '[user]'
32+
change_logs: []

exporter/loadbalancingexporter/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
9595
* `port` port to be used for exporting the traces to the IP addresses resolved from `hostname`. If `port` is not specified, the default port 4317 is used.
9696
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used.
9797
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
98+
* `quarantine` node: enables a quarantine mechanism that prevents the exporter from sending data to endpoints that have previously failed (i.e., are marked as unhealthy) until a defined quarantine period passes. While an endpoint is quarantined, only healthy endpoints in the hash ring are used to dispatch data, avoiding unnecessary hash ring updates and reducing the risk of repeatedly targeting unhealthy endpoints.
99+
* `enabled` toggle to activate endpoint quarantine logic. Default is `false`.
100+
* `duration` how long in go-Duration format an unhealthy endpoint should remain in quarantine before the exporter retries it (e.g., `30s`, `1m`). Defaults to `30s` if not specified.
98101
* The `k8s` node accepts the following optional properties:
99102
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
100103
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.

exporter/loadbalancingexporter/config.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,11 @@ type StaticResolver struct {
7878

7979
// DNSResolver defines the configuration for the DNS resolver
8080
type DNSResolver struct {
81-
Hostname string `mapstructure:"hostname"`
82-
Port string `mapstructure:"port"`
83-
Interval time.Duration `mapstructure:"interval"`
84-
Timeout time.Duration `mapstructure:"timeout"`
81+
Hostname string `mapstructure:"hostname"`
82+
Port string `mapstructure:"port"`
83+
Interval time.Duration `mapstructure:"interval"`
84+
Timeout time.Duration `mapstructure:"timeout"`
85+
Quarantine QuarantineSettings `mapstructure:"quarantine"`
8586
// prevent unkeyed literal initialization
8687
_ struct{}
8788
}
@@ -96,6 +97,17 @@ type K8sSvcResolver struct {
9697
_ struct{}
9798
}
9899

100+
// QuarantineSettings defines the configuration for endpoint quarantine behavior
101+
type QuarantineSettings struct {
102+
// Duration specifies how long an unhealthy endpoint should be excluded from load balancing
103+
// after a failure. After this duration, the endpoint will be eligible for retry.
104+
// Default: 30s
105+
Duration time.Duration `mapstructure:"duration"`
106+
Enabled bool `mapstructure:"enabled"`
107+
// prevent unkeyed literal initialization
108+
_ struct{}
109+
}
110+
99111
type AWSCloudMapResolver struct {
100112
NamespaceName string `mapstructure:"namespace"`
101113
ServiceName string `mapstructure:"service_name"`

exporter/loadbalancingexporter/consistent_hashing.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,21 @@ func newHashRing(endpoints []string) *hashRing {
3939
}
4040
}
4141

42-
// endpointFor calculates which backend is responsible for the given traceID
43-
func (h *hashRing) endpointFor(identifier []byte) string {
44-
if h == nil {
45-
// perhaps the ring itself couldn't get initialized yet?
46-
return ""
47-
}
42+
// getPosition calculates the position in the ring for the given identifier
43+
func getPosition(identifier []byte) position {
4844
hasher := crc32.NewIEEE()
4945
hasher.Write(identifier)
5046
hash := hasher.Sum32()
5147
pos := hash % maxPositions
48+
return position(pos)
49+
}
5250

53-
return h.findEndpoint(position(pos))
51+
// endpointFor calculates which backend is responsible for the given traceID
52+
func (h *hashRing) endpointFor(identifier []byte) string {
53+
if h == nil {
54+
return ""
55+
}
56+
return h.findEndpoint(getPosition(identifier))
5457
}
5558

5659
// findEndpoint returns the "next" endpoint starting from the given position, or an empty string in case no endpoints are available

exporter/loadbalancingexporter/consistent_hashing_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package loadbalancingexporter
55

66
import (
77
"fmt"
8+
"hash/crc32"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
@@ -46,6 +47,39 @@ func TestEndpointFor(t *testing.T) {
4647
}
4748
}
4849

50+
func TestGetPosition(t *testing.T) {
51+
tests := []struct {
52+
name string
53+
identifier []byte
54+
want position
55+
}{
56+
{
57+
name: "simple case",
58+
identifier: []byte("example"),
59+
want: position(crc32.ChecksumIEEE([]byte("example")) % maxPositions),
60+
},
61+
{
62+
name: "different input",
63+
identifier: []byte("another"),
64+
want: position(crc32.ChecksumIEEE([]byte("another")) % maxPositions),
65+
},
66+
{
67+
name: "empty identifier",
68+
identifier: []byte(""),
69+
want: position(crc32.ChecksumIEEE([]byte("")) % maxPositions),
70+
},
71+
}
72+
73+
for _, tt := range tests {
74+
t.Run(tt.name, func(t *testing.T) {
75+
got := getPosition(tt.identifier)
76+
if got != tt.want {
77+
t.Errorf("getPosition(%q) = %v, want %v", tt.identifier, got, tt.want)
78+
}
79+
})
80+
}
81+
}
82+
4983
func TestPositionsFor(t *testing.T) {
5084
// prepare
5185
endpoint := "host1"

exporter/loadbalancingexporter/loadbalancer.go

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"slices"
1111
"strings"
1212
"sync"
13+
"time"
1314

1415
"go.opentelemetry.io/collector/component"
1516
"go.uber.org/zap"
@@ -38,6 +39,10 @@ type loadBalancer struct {
3839
componentFactory componentFactory
3940
exporters map[string]*wrappedExporter
4041

42+
// Track unhealthy endpoints across all signal types
43+
unhealthyEndpoints map[string]time.Time
44+
healthLock sync.RWMutex
45+
4146
stopped bool
4247
updateLock sync.RWMutex
4348
}
@@ -79,12 +84,14 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
7984

8085
var err error
8186
dnsResolver := oCfg.Resolver.DNS.Get()
87+
8288
res, err = newDNSResolver(
8389
dnsLogger,
8490
dnsResolver.Hostname,
8591
dnsResolver.Port,
8692
dnsResolver.Interval,
8793
dnsResolver.Timeout,
94+
&dnsResolver.Quarantine,
8895
telemetry,
8996
)
9097
if err != nil {
@@ -137,10 +144,11 @@ func newLoadBalancer(logger *zap.Logger, cfg component.Config, factory component
137144
}
138145

139146
return &loadBalancer{
140-
logger: logger,
141-
res: res,
142-
componentFactory: factory,
143-
exporters: map[string]*wrappedExporter{},
147+
logger: logger,
148+
res: res,
149+
componentFactory: factory,
150+
exporters: map[string]*wrappedExporter{},
151+
unhealthyEndpoints: make(map[string]time.Time),
144152
}, nil
145153
}
146154

@@ -212,6 +220,48 @@ func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []st
212220
}
213221
}
214222

223+
// markUnhealthy marks an endpoint as unhealthy
224+
func (lb *loadBalancer) markUnhealthy(endpoint string) {
225+
lb.healthLock.Lock()
226+
defer lb.healthLock.Unlock()
227+
228+
if _, exists := lb.unhealthyEndpoints[endpoint]; !exists {
229+
lb.unhealthyEndpoints[endpoint] = time.Now()
230+
}
231+
}
232+
233+
// isHealthy checks if an endpoint is healthy or if it has been quarantined long enough to retry
234+
func (lb *loadBalancer) isHealthy(endpoint string) bool {
235+
lb.healthLock.RLock()
236+
timestamp, exists := lb.unhealthyEndpoints[endpoint]
237+
lb.healthLock.RUnlock()
238+
239+
if !exists {
240+
return true
241+
}
242+
243+
// If quarantine period has passed, remove from unhealthy list and allow retry
244+
if dnsRes, ok := lb.res.(*dnsResolver); ok && dnsRes.quarantine != nil {
245+
lb.logger.Debug("isHealthy", zap.String("endpoint", endpoint), zap.Time("timestamp", timestamp), zap.Duration("quarantineDuration", dnsRes.quarantine.Duration))
246+
if time.Since(timestamp) > dnsRes.quarantine.Duration {
247+
lb.healthLock.Lock()
248+
delete(lb.unhealthyEndpoints, endpoint)
249+
lb.healthLock.Unlock()
250+
lb.logger.Debug("isHealthy - quarantine period passed", zap.String("endpoint", endpoint))
251+
return true
252+
}
253+
}
254+
255+
return false
256+
}
257+
258+
// isQuarantineEnabled checks if the resolver supports quarantine logic and if it's enabled.
259+
// Quarantine logic is supported for DNS resolvers only.
260+
func (lb *loadBalancer) isQuarantineEnabled() bool {
261+
dnsRes, ok := lb.res.(*dnsResolver)
262+
return ok && dnsRes.quarantine.Enabled
263+
}
264+
215265
func (lb *loadBalancer) Shutdown(ctx context.Context) error {
216266
err := lb.res.shutdown(ctx)
217267
lb.stopped = true
@@ -238,3 +288,79 @@ func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter
238288

239289
return exp, endpoint, nil
240290
}
291+
292+
// nextExporterAndEndpoint returns the next exporter and endpoint in the ring after the given position.
293+
func (lb *loadBalancer) nextExporterAndEndpoint(pos position) (*wrappedExporter, string, error) {
294+
lb.updateLock.RLock()
295+
defer lb.updateLock.RUnlock()
296+
297+
endpoint := lb.ring.findEndpoint(pos)
298+
exp, found := lb.exporters[endpointWithPort(endpoint)]
299+
if !found {
300+
return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
301+
}
302+
303+
return exp, endpoint, nil
304+
}
305+
306+
// consumeWithRetryAndQuarantine executes the consume operation with the initial assignment of the exporter and endpoint.
307+
// If the consume operation fails, it will retry with the next endpoint in its associated ring.
308+
// It will try subsequent endpoints until either one succeeds or all available endpoints have been tried.
309+
// Once an unhealthy endpoint is found, it will be marked as unhealthy and not tried again until the quarantine period has passed.
310+
func (lb *loadBalancer) consumeWithRetryAndQuarantine(identifier []byte, exp *wrappedExporter, endpoint string, consume func(*wrappedExporter, string) error) error {
311+
var err error
312+
313+
// Try the first endpoint if it's healthy or quarantine period has passed
314+
if lb.isHealthy(endpoint) {
315+
if err = consume(exp, endpoint); err == nil {
316+
return nil
317+
}
318+
// Mark as unhealthy if the consume failed
319+
lb.markUnhealthy(endpoint)
320+
}
321+
322+
// If consume failed, try with subsequent endpoints
323+
// Keep track of tried endpoints to avoid infinite loop
324+
tried := map[string]bool{endpoint: true}
325+
currentPos := getPosition(identifier)
326+
327+
// Try until we've used all available endpoints
328+
for len(tried) < len(lb.exporters) {
329+
// retryExp, retryEndpoint, retryErr := lb.exporterAndEndpoint(identifier)
330+
retryExp, retryEndpoint, retryErr := lb.nextExporterAndEndpoint(currentPos)
331+
if retryErr != nil {
332+
// Return original error if we can't get a new endpoint
333+
return err
334+
}
335+
336+
// If we've already tried this endpoint in this cycle, move to next position
337+
if tried[retryEndpoint] {
338+
currentPos = (currentPos + 1) % position(maxPositions)
339+
continue
340+
}
341+
342+
// Skip unhealthy endpoints that are still in quarantine
343+
if !lb.isHealthy(retryEndpoint) {
344+
tried[retryEndpoint] = true
345+
// If we've exhausted all endpoints and they're all unhealthy, stop
346+
if len(tried) == len(lb.exporters) {
347+
break
348+
}
349+
currentPos = (currentPos + 1) % position(maxPositions)
350+
continue
351+
}
352+
353+
tried[retryEndpoint] = true
354+
355+
if retryErr = consume(retryExp, retryEndpoint); retryErr == nil {
356+
return nil
357+
}
358+
// Mark as unhealthy if the consume failed
359+
lb.markUnhealthy(retryEndpoint)
360+
361+
// Move to next position for next iteration
362+
currentPos = (currentPos + 1) % position(maxPositions)
363+
}
364+
365+
return fmt.Errorf("all endpoints were tried and failed: %v", tried)
366+
}

0 commit comments

Comments
 (0)