From b2bced804629292671bd632fdec392acb4cd8c7a Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 30 Apr 2025 11:04:07 +0100 Subject: [PATCH 01/13] enhance: Remove docker acquis internal timer use docker events --- pkg/acquisition/modules/docker/docker.go | 123 +++++++++++++---------- 1 file changed, 68 insertions(+), 55 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 582da3d53a1..ec83d0641e5 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -11,16 +11,17 @@ import ( "strings" "time" + "github.com/crowdsecurity/dlog" dockerTypes "github.com/docker/docker/api/types" dockerContainer "github.com/docker/docker/api/types/container" + dockerTypesEvents "github.com/docker/docker/api/types/events" + dockerFilter "github.com/docker/docker/api/types/filters" "github.com/docker/docker/client" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" "gopkg.in/tomb.v2" "gopkg.in/yaml.v2" - "github.com/crowdsecurity/dlog" - "github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration" "github.com/crowdsecurity/crowdsec/pkg/types" ) @@ -53,7 +54,6 @@ type DockerSource struct { runningContainerState map[string]*ContainerConfig compiledContainerName []*regexp.Regexp compiledContainerID []*regexp.Regexp - CheckIntervalDuration time.Duration logger *log.Entry Client client.CommonAPIClient t *tomb.Tomb @@ -75,9 +75,8 @@ func (d *DockerSource) GetUuid() string { func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { d.Config = DockerConfiguration{ - FollowStdout: true, // default - FollowStdErr: true, // default - CheckInterval: "1s", // default + FollowStdout: true, // default + FollowStdErr: true, // default } err := yaml.UnmarshalStrict(yamlConfig, &d.Config) @@ -97,11 +96,6 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { return errors.New("use_container_labels and container_name, container_id, container_id_regexp, container_name_regexp are mutually exclusive") } - d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval) - if err != nil { - return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration) - } - if d.Config.Mode == "" { d.Config.Mode = configuration.TAIL_MODE } @@ -496,62 +490,81 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes. } func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { - ticker := time.NewTicker(d.CheckIntervalDuration) - d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String()) - - for { - select { - case <-d.t.Dying(): - d.logger.Infof("stopping container watcher") - return nil - case <-ticker.C: - // to track for garbage collection - runningContainersID := make(map[string]bool) - - runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{}) - if err != nil { - if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { - for idx, container := range d.runningContainerState { - if d.runningContainerState[idx].t.Alive() { - d.logger.Infof("killing tail for container %s", container.Name) - d.runningContainerState[idx].t.Kill(nil) - - if err := d.runningContainerState[idx].t.Wait(); err != nil { - d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) - } + checkContainers := func() error { + // to track for garbage collection + runningContainersID := make(map[string]bool) + + runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{}) + if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { + for idx, container := range d.runningContainerState { + if d.runningContainerState[idx].t.Alive() { + d.logger.Infof("killing tail for container %s", container.Name) + d.runningContainerState[idx].t.Kill(nil) + + if err := d.runningContainerState[idx].t.Wait(); err != nil { + d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) } - - delete(d.runningContainerState, idx) } - } else { - log.Errorf("container list err: %s", err) - } - continue + delete(d.runningContainerState, idx) + } + } else { + log.Errorf("container list err: %s", err) } - for _, container := range runningContainers { - runningContainersID[container.ID] = true + return err + } - // don't need to re eval an already monitored container - if _, ok := d.runningContainerState[container.ID]; ok { - continue - } + for _, container := range runningContainers { + runningContainersID[container.ID] = true - if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { - monitChan <- containerConfig - } + // don't need to re eval an already monitored container + if _, ok := d.runningContainerState[container.ID]; ok { + continue } - for containerStateID, containerConfig := range d.runningContainerState { - if _, ok := runningContainersID[containerStateID]; !ok { - deleteChan <- containerConfig - } + if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { + monitChan <- containerConfig + } + } + + for containerStateID, containerConfig := range d.runningContainerState { + if _, ok := runningContainersID[containerStateID]; !ok { + deleteChan <- containerConfig } + } - d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState)) + d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState)) + return nil + } + + err := checkContainers() + if err != nil { + return err + } - ticker.Reset(d.CheckIntervalDuration) + f := dockerFilter.NewArgs() + f.Add("type", "container") + options := dockerTypesEvents.ListOptions{ + Filters: f, + } + eventsChan, errChan := d.Client.Events(ctx, options) + for { + select { + case <-d.t.Dying(): + d.logger.Infof("stopping container watcher") + return nil + case event := <-eventsChan: + if event.Action == "start" || event.Action == "die" { + err := checkContainers() + if err != nil { + d.logger.Errorf("error while checking containers: %s", err) + } + } + case err := <-errChan: + d.logger.Errorf("error while watching containers: %s", err) + return err } } } From e981128f61ff40fe4b7d3823a5b073ad60904d66 Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 30 Apr 2025 11:43:33 +0100 Subject: [PATCH 02/13] enhance: split code and add events to mock client --- pkg/acquisition/modules/docker/docker.go | 78 +++++++++---------- pkg/acquisition/modules/docker/docker_test.go | 10 +++ 2 files changed, 49 insertions(+), 39 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index ec83d0641e5..b49c939e2db 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -489,57 +489,57 @@ func (d *DockerSource) EvalContainer(ctx context.Context, container dockerTypes. return nil } -func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { - checkContainers := func() error { - // to track for garbage collection - runningContainersID := make(map[string]bool) - - runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{}) - if err != nil { - if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { - for idx, container := range d.runningContainerState { - if d.runningContainerState[idx].t.Alive() { - d.logger.Infof("killing tail for container %s", container.Name) - d.runningContainerState[idx].t.Kill(nil) - - if err := d.runningContainerState[idx].t.Wait(); err != nil { - d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) - } - } +func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { + // to track for garbage collection + runningContainersID := make(map[string]bool) + + runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{}) + if err != nil { + if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") { + for idx, container := range d.runningContainerState { + if d.runningContainerState[idx].t.Alive() { + d.logger.Infof("killing tail for container %s", container.Name) + d.runningContainerState[idx].t.Kill(nil) - delete(d.runningContainerState, idx) + if err := d.runningContainerState[idx].t.Wait(); err != nil { + d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) + } } - } else { - log.Errorf("container list err: %s", err) - } - return err + delete(d.runningContainerState, idx) + } + } else { + log.Errorf("container list err: %s", err) } - for _, container := range runningContainers { - runningContainersID[container.ID] = true + return err + } - // don't need to re eval an already monitored container - if _, ok := d.runningContainerState[container.ID]; ok { - continue - } + for _, container := range runningContainers { + runningContainersID[container.ID] = true - if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { - monitChan <- containerConfig - } + // don't need to re eval an already monitored container + if _, ok := d.runningContainerState[container.ID]; ok { + continue } - for containerStateID, containerConfig := range d.runningContainerState { - if _, ok := runningContainersID[containerStateID]; !ok { - deleteChan <- containerConfig - } + if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { + monitChan <- containerConfig } + } - d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState)) - return nil + for containerStateID, containerConfig := range d.runningContainerState { + if _, ok := runningContainersID[containerStateID]; !ok { + deleteChan <- containerConfig + } } - err := checkContainers() + d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState)) + return nil +} + +func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error { + err := d.checkContainers(ctx, monitChan, deleteChan) if err != nil { return err } @@ -557,7 +557,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta return nil case event := <-eventsChan: if event.Action == "start" || event.Action == "die" { - err := checkContainers() + err := d.checkContainers(ctx, monitChan, deleteChan) if err != nil { d.logger.Errorf("error while checking containers: %s", err) } diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index e44963b58a3..1c8af103ede 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -13,6 +13,8 @@ import ( dockerTypes "github.com/docker/docker/api/types" dockerContainer "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/events" + dockerTypesEvents "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -273,6 +275,14 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke return r, nil } +// Since we are mocking the docker client, we return channels that will never be used +func (cli *mockDockerCli) Events(ctx context.Context, options dockerTypesEvents.ListOptions) (<-chan events.Message, <-chan error) { + eventsChan := make(chan events.Message) + errChan := make(chan error) + + return eventsChan, errChan +} + func TestOneShot(t *testing.T) { ctx := t.Context() From e1b533744543fffae5971486ff99c53991f53e03 Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 30 Apr 2025 12:21:44 +0100 Subject: [PATCH 03/13] enhance: fixes --- pkg/acquisition/modules/docker/docker.go | 11 +++++++---- pkg/acquisition/modules/docker/docker_test.go | 5 ++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index b49c939e2db..02b83a7bc1b 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net/url" "regexp" "strconv" @@ -556,15 +557,17 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta d.logger.Infof("stopping container watcher") return nil case event := <-eventsChan: - if event.Action == "start" || event.Action == "die" { + if event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie { err := d.checkContainers(ctx, monitChan, deleteChan) if err != nil { - d.logger.Errorf("error while checking containers: %s", err) + return err // We hit an error while watching containers, we stop the watcher } } case err := <-errChan: - d.logger.Errorf("error while watching containers: %s", err) - return err + if errors.Is(err, io.EOF) { + d.logger.Debug("EOF while watching containers, we stop the watcher") + } + return err // We hit an error while watching containers, we stop the watcher } } } diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index 1c8af103ede..ca470a93d4b 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -13,7 +13,6 @@ import ( dockerTypes "github.com/docker/docker/api/types" dockerContainer "github.com/docker/docker/api/types/container" - "github.com/docker/docker/api/types/events" dockerTypesEvents "github.com/docker/docker/api/types/events" "github.com/docker/docker/client" log "github.com/sirupsen/logrus" @@ -276,8 +275,8 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke } // Since we are mocking the docker client, we return channels that will never be used -func (cli *mockDockerCli) Events(ctx context.Context, options dockerTypesEvents.ListOptions) (<-chan events.Message, <-chan error) { - eventsChan := make(chan events.Message) +func (cli *mockDockerCli) Events(ctx context.Context, options dockerTypesEvents.ListOptions) (<-chan dockerTypesEvents.Message, <-chan error) { + eventsChan := make(chan dockerTypesEvents.Message) errChan := make(chan error) return eventsChan, errChan From 4b8f2340a6a666e245f561dd1b4e890b5659c61e Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 30 Apr 2025 13:02:39 +0100 Subject: [PATCH 04/13] enhance: add check_interval test and add deprecation warning --- pkg/acquisition/modules/docker/docker.go | 4 ++++ pkg/acquisition/modules/docker/docker_test.go | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 02b83a7bc1b..dfe0546a626 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -97,6 +97,10 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { return errors.New("use_container_labels and container_name, container_id, container_id_regexp, container_name_regexp are mutually exclusive") } + if d.Config.CheckInterval != "" { + d.logger.Warnf("check_interval is deprecated, it will be removed in a future version") + } + if d.Config.Mode == "" { d.Config.Mode = configuration.TAIL_MODE } diff --git a/pkg/acquisition/modules/docker/docker_test.go b/pkg/acquisition/modules/docker/docker_test.go index ca470a93d4b..24816346106 100644 --- a/pkg/acquisition/modules/docker/docker_test.go +++ b/pkg/acquisition/modules/docker/docker_test.go @@ -51,6 +51,15 @@ source: docker`, config: ` mode: cat source: docker +container_name: + - toto`, + expectedErr: "", + }, + { + config: ` +mode: cat +source: docker +check_interval: 10s container_name: - toto`, expectedErr: "", From c519833429259e02bf4524bb980bb3c3c982e718 Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 30 Apr 2025 13:03:24 +0100 Subject: [PATCH 05/13] enhance: warnf -> warn --- pkg/acquisition/modules/docker/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index dfe0546a626..9d83777e3cb 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -98,7 +98,7 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { } if d.Config.CheckInterval != "" { - d.logger.Warnf("check_interval is deprecated, it will be removed in a future version") + d.logger.Warn("check_interval is deprecated, it will be removed in a future version") } if d.Config.Mode == "" { From 68882a72ae68ffc458ee694d85ed4aca93736bf6 Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 11:10:31 +0100 Subject: [PATCH 06/13] enhance: Add a retry loop to reconnect to docker events when docker is down --- pkg/acquisition/modules/docker/docker.go | 93 ++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 7 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 9d83777e3cb..61eccbac705 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -5,7 +5,7 @@ import ( "context" "errors" "fmt" - "io" + "math" "net/url" "regexp" "strconv" @@ -555,23 +555,102 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta Filters: f, } eventsChan, errChan := d.Client.Events(ctx, options) + + const ( + initialBackoff = 2 * time.Second + maxBackoff = 60 * time.Second // Cap at 1 minute + backoffFactor = 2 + maxRetries = 5 // Max retries per reconnection attempt + ) + errorRetryBackoff := initialBackoff + for { select { case <-d.t.Dying(): d.logger.Infof("stopping container watcher") return nil + case event := <-eventsChan: + // Reset backoff on successful event + if errorRetryBackoff > initialBackoff { + errorRetryBackoff = initialBackoff + d.logger.Debug("Successfully receiving Docker events, reset backoff timer") + } + if event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie { - err := d.checkContainers(ctx, monitChan, deleteChan) - if err != nil { - return err // We hit an error while watching containers, we stop the watcher + if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil { + d.logger.Warnf("Failed to check containers: %v", err) } } + case err := <-errChan: - if errors.Is(err, io.EOF) { - d.logger.Debug("EOF while watching containers, we stop the watcher") + if err == nil { + continue } - return err // We hit an error while watching containers, we stop the watcher + + d.logger.Errorf("Docker events error: %v", err) + + // Multiple retry attempts within the error case + retries := 0 + var reconnectErr error + + for retries < maxRetries { + // Check for cancellation before sleeping + select { + case <-ctx.Done(): + return ctx.Err() + case <-d.t.Dying(): + return nil + default: + // Continue with reconnection attempt + } + + // Sleep with backoff + d.logger.Debugf("Retry %d/%d: Waiting %s before reconnecting to Docker events", + retries+1, maxRetries, errorRetryBackoff) + + select { + case <-time.After(errorRetryBackoff): + // Continue after backoff + case <-ctx.Done(): + return ctx.Err() + case <-d.t.Dying(): + return nil + } + + // Try to reconnect + d.logger.Infof("Attempting to reconnect to Docker events stream") + newEventsChan, newErrChan := d.Client.Events(ctx, options) + + // Check if connection is immediately broken + select { + case reconnectErr = <-newErrChan: + // Connection failed immediately + retries++ + errorRetryBackoff = time.Duration(math.Min( + float64(errorRetryBackoff*backoffFactor), + float64(maxBackoff), + )) + d.logger.Errorf("Failed to reconnect to Docker events (attempt %d/%d): %v", + retries, maxRetries, reconnectErr) + + default: + // No immediate error, seems to have reconnected successfully + d.logger.Info("Successfully reconnected to Docker events") + eventsChan = newEventsChan + errChan = newErrChan + errorRetryBackoff = initialBackoff + goto reconnected + } + } + + // If we've exhausted all retries, return with an error + return fmt.Errorf("failed to reconnect to Docker events after %d attempts: %w", + maxRetries, reconnectErr) + + reconnected: + // Continue in the main loop with new channels + continue } } } From 45f46c8ed3cf48a98872155e610e33f94541c84e Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 11:34:26 +0100 Subject: [PATCH 07/13] enhance: remove max retries seconds as we have a max count instead --- pkg/acquisition/modules/docker/docker.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 61eccbac705..74891e53566 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "math" "net/url" "regexp" "strconv" @@ -558,7 +557,6 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta const ( initialBackoff = 2 * time.Second - maxBackoff = 60 * time.Second // Cap at 1 minute backoffFactor = 2 maxRetries = 5 // Max retries per reconnection attempt ) @@ -627,10 +625,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta case reconnectErr = <-newErrChan: // Connection failed immediately retries++ - errorRetryBackoff = time.Duration(math.Min( - float64(errorRetryBackoff*backoffFactor), - float64(maxBackoff), - )) + errorRetryBackoff = errorRetryBackoff * backoffFactor d.logger.Errorf("Failed to reconnect to Docker events (attempt %d/%d): %v", retries, maxRetries, reconnectErr) From f9a4ae3fe140759dd1aa95c347c9dd1ab7910a8e Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 11:58:22 +0100 Subject: [PATCH 08/13] enhance: mr linter mad --- pkg/acquisition/modules/docker/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 74891e53566..2b7fd5951ed 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -625,7 +625,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta case reconnectErr = <-newErrChan: // Connection failed immediately retries++ - errorRetryBackoff = errorRetryBackoff * backoffFactor + errorRetryBackoff *= backoffFactor d.logger.Errorf("Failed to reconnect to Docker events (attempt %d/%d): %v", retries, maxRetries, reconnectErr) From 1081d615eecb2674d0e2e2162c22324f4db68055 Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 13:58:53 +0100 Subject: [PATCH 09/13] enhance: keep trying until we hit max timer --- pkg/acquisition/modules/docker/docker.go | 29 +++++++++++------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 2b7fd5951ed..292d20cf585 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -556,9 +556,9 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta eventsChan, errChan := d.Client.Events(ctx, options) const ( - initialBackoff = 2 * time.Second - backoffFactor = 2 - maxRetries = 5 // Max retries per reconnection attempt + initialBackoff = 2 * time.Second + backoffFactor = 2 + errorRetryMaxBackoff = 60 * time.Second ) errorRetryBackoff := initialBackoff @@ -590,9 +590,8 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta // Multiple retry attempts within the error case retries := 0 - var reconnectErr error - for retries < maxRetries { + for { // Check for cancellation before sleeping select { case <-ctx.Done(): @@ -604,8 +603,8 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta } // Sleep with backoff - d.logger.Debugf("Retry %d/%d: Waiting %s before reconnecting to Docker events", - retries+1, maxRetries, errorRetryBackoff) + d.logger.Debugf("Retry %d: Waiting %s before reconnecting to Docker events", + retries+1, errorRetryBackoff) select { case <-time.After(errorRetryBackoff): @@ -622,27 +621,25 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta // Check if connection is immediately broken select { - case reconnectErr = <-newErrChan: + case err := <-newErrChan: // Connection failed immediately retries++ errorRetryBackoff *= backoffFactor - d.logger.Errorf("Failed to reconnect to Docker events (attempt %d/%d): %v", - retries, maxRetries, reconnectErr) + if errorRetryBackoff > errorRetryMaxBackoff { + errorRetryBackoff = errorRetryMaxBackoff + } + d.logger.Errorf("Failed to reconnect to Docker events (attempt %d): %v", + retries, err) default: // No immediate error, seems to have reconnected successfully d.logger.Info("Successfully reconnected to Docker events") + errorRetryBackoff = initialBackoff eventsChan = newEventsChan errChan = newErrChan - errorRetryBackoff = initialBackoff goto reconnected } } - - // If we've exhausted all retries, return with an error - return fmt.Errorf("failed to reconnect to Docker events after %d attempts: %w", - maxRetries, reconnectErr) - reconnected: // Continue in the main loop with new channels continue From 38810df96bfc531e22ab19576c462f4d5a69f40f Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 14:09:33 +0100 Subject: [PATCH 10/13] enhance: After a reconnect we always check the containers to attempt to resurrect or else we wait until a event comes in which it may not --- pkg/acquisition/modules/docker/docker.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 292d20cf585..3fa2d5c1798 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -569,12 +569,6 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta return nil case event := <-eventsChan: - // Reset backoff on successful event - if errorRetryBackoff > initialBackoff { - errorRetryBackoff = initialBackoff - d.logger.Debug("Successfully receiving Docker events, reset backoff timer") - } - if event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie { if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil { d.logger.Warnf("Failed to check containers: %v", err) @@ -633,14 +627,17 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta default: // No immediate error, seems to have reconnected successfully - d.logger.Info("Successfully reconnected to Docker events") errorRetryBackoff = initialBackoff + d.logger.Info("Successfully reconnected to Docker events") eventsChan = newEventsChan errChan = newErrChan goto reconnected } } reconnected: + // We check containers after a reconnection because the docker daemon might have restarted + // and the container tombs may have self deleted + d.checkContainers(ctx, monitChan, deleteChan) // Continue in the main loop with new channels continue } From f04b51ba101c70479374b72f8bfc44d89f54b49b Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 14:12:30 +0100 Subject: [PATCH 11/13] enhance: Move info outside for loop --- pkg/acquisition/modules/docker/docker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 3fa2d5c1798..8a7ffc701cf 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -585,6 +585,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta // Multiple retry attempts within the error case retries := 0 + d.logger.Infof("Attempting to reconnect to Docker events stream") for { // Check for cancellation before sleeping select { @@ -610,7 +611,6 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta } // Try to reconnect - d.logger.Infof("Attempting to reconnect to Docker events stream") newEventsChan, newErrChan := d.Client.Events(ctx, options) // Check if connection is immediately broken From b55d9cbe883cf9c6d1413ec4544cf68981fd89b3 Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 14:14:55 +0100 Subject: [PATCH 12/13] enhance: Move info to reconnect goto --- pkg/acquisition/modules/docker/docker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 8a7ffc701cf..d69ccb18a4e 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -585,7 +585,7 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta // Multiple retry attempts within the error case retries := 0 - d.logger.Infof("Attempting to reconnect to Docker events stream") + d.logger.Info("Attempting to reconnect to Docker events stream") for { // Check for cancellation before sleeping select { @@ -628,13 +628,13 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta default: // No immediate error, seems to have reconnected successfully errorRetryBackoff = initialBackoff - d.logger.Info("Successfully reconnected to Docker events") eventsChan = newEventsChan errChan = newErrChan goto reconnected } } reconnected: + d.logger.Info("Successfully reconnected to Docker events") // We check containers after a reconnection because the docker daemon might have restarted // and the container tombs may have self deleted d.checkContainers(ctx, monitChan, deleteChan) From 4cea22c023a7c4545a07c65033c2586831f69ca8 Mon Sep 17 00:00:00 2001 From: Laurence Date: Fri, 2 May 2025 14:23:43 +0100 Subject: [PATCH 13/13] enhance: mr linter pls be happy --- pkg/acquisition/modules/docker/docker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index d69ccb18a4e..c39da5f681f 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -637,7 +637,9 @@ func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *Conta d.logger.Info("Successfully reconnected to Docker events") // We check containers after a reconnection because the docker daemon might have restarted // and the container tombs may have self deleted - d.checkContainers(ctx, monitChan, deleteChan) + if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil { + d.logger.Warnf("Failed to check containers: %v", err) + } // Continue in the main loop with new channels continue }