Skip to content

Commit

Permalink
test: add additional e2e tests for check reconfiguration
Browse files Browse the repository at this point in the history
* test: add additional e2e tests for check reconfiguration
* fix: race conditions in all checks when reconfiguring while running checks
* refactor: add check base constructor
* fix: add missing json tags for dns check result struct
* chore: use time ticker instead of time.After for check intervals
* fix: only update check config if it has changed
* chore: improve naming in checks
* chore: unexpose private functions & structs
* refactor: simplify check config & e2e test builder

Signed-off-by: lvlcn-t <[email protected]>
  • Loading branch information
lvlcn-t committed Nov 17, 2024
1 parent 5937af4 commit 9e4526f
Show file tree
Hide file tree
Showing 19 changed files with 1,176 additions and 502 deletions.
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"-race",
"-cover",
"-count=1",
"-timeout=120s",
"-timeout=240s",
"-v"
]
}
45 changes: 29 additions & 16 deletions pkg/checks/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,42 +42,55 @@ type Check interface {
// run until the context is canceled and handle problems itself.
// Returning a non-nil error will cause the shutdown of the check.
Run(ctx context.Context, cResult chan ResultDTO) error
// Shutdown is called once when the check is unregistered or sparrow shuts down
// Shutdown is called once when the check is unregistered or sparrow shuts down.
Shutdown()
// UpdateConfig is called once when the check is registered
// This is also called while the check is running, if the remote config is updated
// This should return an error if the config is invalid
// UpdateConfig updates the configuration of the check.
// It is called when the runtime configuration is updated.
// The check should handle the update itself.
// Returns an error if the configuration is invalid.
UpdateConfig(config Runtime) error
// GetConfig returns the current configuration of the check
// GetConfig returns the current configuration of the check.
GetConfig() Runtime
// Name returns the name of the check
// Name returns the name of the check.
Name() string
// Schema returns an openapi3.SchemaRef of the result type returned by the check
// Schema returns an openapi3.SchemaRef of the result type returned by the check.
Schema() (*openapi3.SchemaRef, error)
// GetMetricCollectors allows the check to provide prometheus metric collectors
// GetMetricCollectors allows the check to provide prometheus metric collectors.
GetMetricCollectors() []prometheus.Collector
// RemoveLabelledMetrics allows the check to remove the prometheus metrics
// of the check whose `target` label matches the passed value
// of the check whose `target` label matches the passed value.
RemoveLabelledMetrics(target string) error
}

// Base is a struct providing common fields and methods used by implementations of the [Check] interface.
// It serves as a foundational structure that should be embedded in specific check implementations.
type Base struct {
// Mutex for thread-safe access to shared resources within the check implementation
Mu sync.Mutex
// Signal channel used to notify about shutdown of a check
DoneChan chan struct{}
// Mutex for thread-safe access to shared resources within the check implementation.
Mutex sync.Mutex
// Done channel is used to notify about shutdown of a check.
Done chan struct{}
// Update is a channel used to notify about configuration updates.
Update chan struct{}
// closed is a flag indicating if the check has been shut down.
closed bool
}

// NewBase creates a new instance of the [Base] struct.
func NewBase() Base {
return Base{
Mutex: sync.Mutex{},
Done: make(chan struct{}, 1),
Update: make(chan struct{}, 3),
closed: false,
}
}

// Shutdown closes the DoneChan to signal the check to stop running.
func (b *Base) Shutdown() {
b.Mu.Lock()
defer b.Mu.Unlock()
b.Mutex.Lock()
defer b.Mutex.Unlock()
if !b.closed {
close(b.DoneChan)
close(b.Done)
b.closed = true
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/checks/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,21 @@ func TestBase_Shutdown(t *testing.T) {
{
name: "shutdown",
b: &Base{
DoneChan: make(chan struct{}, 1),
Done: make(chan struct{}, 1),
},
},
{
name: "already shutdown",
b: &Base{
DoneChan: make(chan struct{}, 1),
closed: true,
Done: make(chan struct{}, 1),
closed: true,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.b.closed {
close(tt.b.DoneChan)
close(tt.b.Done)
}
tt.b.Shutdown()

Expand All @@ -37,7 +37,7 @@ func TestBase_Shutdown(t *testing.T) {
}

assert.Panics(t, func() {
tt.b.DoneChan <- struct{}{}
tt.b.Done <- struct{}{}
}, "Base.DoneChan should be closed")
})
}
Expand Down
107 changes: 60 additions & 47 deletions pkg/checks/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dns
import (
"context"
"net"
"reflect"
"slices"
"sync"
"time"
Expand All @@ -33,96 +34,104 @@ import (
)

var (
_ checks.Check = (*DNS)(nil)
_ checks.Check = (*check)(nil)
_ checks.Runtime = (*Config)(nil)
)

const CheckName = "dns"

// DNS is a check that resolves the names and addresses
type DNS struct {
// check is the implementation of the dns check.
// It resolves DNS names and IP addresses for a list of targets.
type check struct {
checks.Base
config Config
metrics metrics
client Resolver
}

func (d *DNS) GetConfig() checks.Runtime {
d.Mu.Lock()
defer d.Mu.Unlock()
return &d.config
func (ch *check) GetConfig() checks.Runtime {
ch.Mutex.Lock()
defer ch.Mutex.Unlock()
return &ch.config
}

func (d *DNS) Name() string {
func (*check) Name() string {
return CheckName
}

// NewCheck creates a new instance of the dns check
func NewCheck() checks.Check {
return &DNS{
Base: checks.Base{
Mu: sync.Mutex{},
DoneChan: make(chan struct{}, 1),
},
return &check{
Base: checks.NewBase(),
config: Config{
Retry: checks.DefaultRetry,
},
metrics: newMetrics(),
client: NewResolver(),
client: newResolver(),
}
}

// result represents the result of a single DNS check for a specific target
type result struct {
Resolved []string
Error *string
Total float64
Resolved []string `json:"resolved"`
Error *string `json:"error"`
Total float64 `json:"total"`
}

// Run starts the dns check
func (d *DNS) Run(ctx context.Context, cResult chan checks.ResultDTO) error {
func (ch *check) Run(ctx context.Context, cResult chan checks.ResultDTO) error {
ctx, cancel := logger.NewContextWithLogger(ctx)
defer cancel()
log := logger.FromContext(ctx)

log.Info("Starting dns check", "interval", d.config.Interval.String())
ticker := time.NewTicker(ch.config.Interval)
log.InfoContext(ctx, "Starting dns check", "interval", ch.config.Interval.String())
for {
select {
case <-ctx.Done():
log.Error("Context canceled", "err", ctx.Err())
log.ErrorContext(ctx, "Context canceled", "err", ctx.Err())
return ctx.Err()
case <-d.DoneChan:
case <-ch.Done:
return nil
case <-time.After(d.config.Interval):
res := d.check(ctx)

case <-ch.Update:
ch.Mutex.Lock()
ticker.Stop()
ticker = time.NewTicker(ch.config.Interval)
log.DebugContext(ctx, "Interval of dns check updated", "interval", ch.config.Interval.String())
ch.Mutex.Unlock()
case <-ticker.C:
res := ch.check(ctx)
cResult <- checks.ResultDTO{
Name: d.Name(),
Name: ch.Name(),
Result: &checks.Result{
Data: res,
Timestamp: time.Now(),
},
}
log.Debug("Successfully finished dns check run")
log.DebugContext(ctx, "Successfully finished dns check run")
}
}
}

func (d *DNS) UpdateConfig(cfg checks.Runtime) error {
func (ch *check) UpdateConfig(cfg checks.Runtime) error {
if c, ok := cfg.(*Config); ok {
d.Mu.Lock()
defer d.Mu.Unlock()
ch.Mutex.Lock()
defer ch.Mutex.Unlock()
if reflect.DeepEqual(ch.config, *c) {
return nil
}

for _, target := range d.config.Targets {
for _, target := range ch.config.Targets {
if !slices.Contains(c.Targets, target) {
err := d.metrics.Remove(target)
err := ch.metrics.Remove(target)
if err != nil {
return err
}
}
}

d.config = *c
ch.config = *c
ch.Update <- struct{}{}
return nil
}

Expand All @@ -134,27 +143,31 @@ func (d *DNS) UpdateConfig(cfg checks.Runtime) error {

// Schema provides the schema of the data that will be provided
// by the dns check
func (d *DNS) Schema() (*openapi3.SchemaRef, error) {
return checks.OpenapiFromPerfData(make(map[string]result))
func (ch *check) Schema() (*openapi3.SchemaRef, error) {
return checks.OpenapiFromPerfData(map[string]result{})
}

// GetMetricCollectors returns all metric collectors of check
func (d *DNS) GetMetricCollectors() []prometheus.Collector {
return d.metrics.GetCollectors()
func (ch *check) GetMetricCollectors() []prometheus.Collector {
return ch.metrics.GetCollectors()
}

// RemoveLabelledMetrics removes the metrics which have the passed
// target as a label
func (d *DNS) RemoveLabelledMetrics(target string) error {
return d.metrics.Remove(target)
func (ch *check) RemoveLabelledMetrics(target string) error {
return ch.metrics.Remove(target)
}

// check performs DNS checks for all configured targets using a custom net.Resolver.
// Returns a map where each target is associated with its DNS check result.
func (d *DNS) check(ctx context.Context) map[string]result {
func (ch *check) check(ctx context.Context) map[string]result {
log := logger.FromContext(ctx)
log.Debug("Checking dns")
if len(d.config.Targets) == 0 {
ch.Mutex.Lock()
cfg := ch.config
ch.Mutex.Unlock()

if len(cfg.Targets) == 0 {
log.Debug("No targets defined")
return map[string]result{}
}
Expand All @@ -163,26 +176,26 @@ func (d *DNS) check(ctx context.Context) map[string]result {
var wg sync.WaitGroup
results := map[string]result{}

d.client.SetDialer(&net.Dialer{
Timeout: d.config.Timeout,
ch.client.SetDialer(&net.Dialer{
Timeout: cfg.Timeout,
})

log.Debug("Getting dns status for each target in separate routine", "amount", len(d.config.Targets))
for _, t := range d.config.Targets {
log.Debug("Getting dns status for each target in separate routine", "amount", len(cfg.Targets))
for _, t := range cfg.Targets {
target := t
wg.Add(1)
lo := log.With("target", target)

getDNSRetry := helper.Retry(func(ctx context.Context) error {
res, err := getDNS(ctx, d.client, target)
res, err := getDNS(ctx, ch.client, target)
mu.Lock()
defer mu.Unlock()
results[target] = res
if err != nil {
return err
}
return nil
}, d.config.Retry)
}, cfg.Retry)

go func() {
defer wg.Done()
Expand All @@ -197,7 +210,7 @@ func (d *DNS) check(ctx context.Context) map[string]result {

mu.Lock()
defer mu.Unlock()
d.metrics.Set(target, results, float64(status))
ch.metrics.Set(target, results, float64(status))
}()
}
wg.Wait()
Expand Down
Loading

0 comments on commit 9e4526f

Please sign in to comment.