Skip to content

enhance: Remove docker acquis internal timer use docker events #3598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 136 additions & 46 deletions pkg/acquisition/modules/docker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@
"strings"
"time"

"github.com/crowdsecurity/dlog"
dockerTypes "github.com/docker/docker/api/types"
dockerContainer "github.com/docker/docker/api/types/container"
dockerTypesEvents "github.com/docker/docker/api/types/events"
dockerFilter "github.com/docker/docker/api/types/filters"
"github.com/docker/docker/client"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v2"

"github.com/crowdsecurity/dlog"

"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
Expand Down Expand Up @@ -53,7 +54,6 @@
runningContainerState map[string]*ContainerConfig
compiledContainerName []*regexp.Regexp
compiledContainerID []*regexp.Regexp
CheckIntervalDuration time.Duration
logger *log.Entry
Client client.CommonAPIClient
t *tomb.Tomb
Expand All @@ -75,9 +75,8 @@

func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
d.Config = DockerConfiguration{
FollowStdout: true, // default
FollowStdErr: true, // default
CheckInterval: "1s", // default
FollowStdout: true, // default
FollowStdErr: true, // default
}

err := yaml.UnmarshalStrict(yamlConfig, &d.Config)
Expand All @@ -97,9 +96,8 @@
return errors.New("use_container_labels and container_name, container_id, container_id_regexp, container_name_regexp are mutually exclusive")
}

d.CheckIntervalDuration, err = time.ParseDuration(d.Config.CheckInterval)
if err != nil {
return fmt.Errorf("parsing 'check_interval' parameters: %s", d.CheckIntervalDuration)
if d.Config.CheckInterval != "" {
d.logger.Warn("check_interval is deprecated, it will be removed in a future version")
}

if d.Config.Mode == "" {
Expand Down Expand Up @@ -495,63 +493,155 @@
return nil
}

func (d *DockerSource) checkContainers(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
// to track for garbage collection
runningContainersID := make(map[string]bool)

runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for idx, container := range d.runningContainerState {
if d.runningContainerState[idx].t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
d.runningContainerState[idx].t.Kill(nil)

if err := d.runningContainerState[idx].t.Wait(); err != nil {
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
}

Check warning on line 510 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L502-L510

Added lines #L502 - L510 were not covered by tests
}

delete(d.runningContainerState, idx)

Check warning on line 513 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L513

Added line #L513 was not covered by tests
}
} else {
log.Errorf("container list err: %s", err)
}

Check warning on line 517 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L515-L517

Added lines #L515 - L517 were not covered by tests

return err

Check warning on line 519 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L519

Added line #L519 was not covered by tests
}

for _, container := range runningContainers {
runningContainersID[container.ID] = true

// don't need to re eval an already monitored container
if _, ok := d.runningContainerState[container.ID]; ok {
continue

Check warning on line 527 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L527

Added line #L527 was not covered by tests
}

if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
monitChan <- containerConfig
}
}

for containerStateID, containerConfig := range d.runningContainerState {
if _, ok := runningContainersID[containerStateID]; !ok {
deleteChan <- containerConfig
}

Check warning on line 538 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L537-L538

Added lines #L537 - L538 were not covered by tests
}

d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
return nil
}

func (d *DockerSource) WatchContainer(ctx context.Context, monitChan chan *ContainerConfig, deleteChan chan *ContainerConfig) error {
ticker := time.NewTicker(d.CheckIntervalDuration)
d.logger.Infof("Container watcher started, interval: %s", d.CheckIntervalDuration.String())
err := d.checkContainers(ctx, monitChan, deleteChan)
if err != nil {
return err
}

Check warning on line 549 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L548-L549

Added lines #L548 - L549 were not covered by tests

f := dockerFilter.NewArgs()
f.Add("type", "container")
options := dockerTypesEvents.ListOptions{
Filters: f,
}
eventsChan, errChan := d.Client.Events(ctx, options)

const (
initialBackoff = 2 * time.Second
backoffFactor = 2
errorRetryMaxBackoff = 60 * time.Second
)
errorRetryBackoff := initialBackoff

for {
select {
case <-d.t.Dying():
d.logger.Infof("stopping container watcher")
return nil
case <-ticker.C:
// to track for garbage collection
runningContainersID := make(map[string]bool)

runningContainers, err := d.Client.ContainerList(ctx, dockerContainer.ListOptions{})
if err != nil {
if strings.Contains(strings.ToLower(err.Error()), "cannot connect to the docker daemon at") {
for idx, container := range d.runningContainerState {
if d.runningContainerState[idx].t.Alive() {
d.logger.Infof("killing tail for container %s", container.Name)
d.runningContainerState[idx].t.Kill(nil)

if err := d.runningContainerState[idx].t.Wait(); err != nil {
d.logger.Infof("error while waiting for death of %s : %s", container.Name, err)
}
}

delete(d.runningContainerState, idx)
}
} else {
log.Errorf("container list err: %s", err)
case event := <-eventsChan:
if event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie {
if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil {
d.logger.Warnf("Failed to check containers: %v", err)

Check warning on line 574 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L571-L574

Added lines #L571 - L574 were not covered by tests
}
}

case err := <-errChan:
if err == nil {

Check warning on line 579 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L578-L579

Added lines #L578 - L579 were not covered by tests
continue
}

for _, container := range runningContainers {
runningContainersID[container.ID] = true
d.logger.Errorf("Docker events error: %v", err)

Check warning on line 583 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L583

Added line #L583 was not covered by tests

// don't need to re eval an already monitored container
if _, ok := d.runningContainerState[container.ID]; ok {
continue
}
// Multiple retry attempts within the error case
retries := 0

Check warning on line 586 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L585-L586

Added lines #L585 - L586 were not covered by tests

if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil {
monitChan <- containerConfig
d.logger.Info("Attempting to reconnect to Docker events stream")
for {
// Check for cancellation before sleeping
select {
case <-ctx.Done():
return ctx.Err()
case <-d.t.Dying():
return nil
default:

Check warning on line 596 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L588-L596

Added lines #L588 - L596 were not covered by tests
// Continue with reconnection attempt
}
}

for containerStateID, containerConfig := range d.runningContainerState {
if _, ok := runningContainersID[containerStateID]; !ok {
deleteChan <- containerConfig
// Sleep with backoff
d.logger.Debugf("Retry %d: Waiting %s before reconnecting to Docker events",
retries+1, errorRetryBackoff)

select {
case <-time.After(errorRetryBackoff):

Check warning on line 605 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L601-L605

Added lines #L601 - L605 were not covered by tests
// Continue after backoff
case <-ctx.Done():
return ctx.Err()
case <-d.t.Dying():
return nil

Check warning on line 610 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L607-L610

Added lines #L607 - L610 were not covered by tests
}
}

d.logger.Tracef("Reading logs from %d containers", len(d.runningContainerState))
// Try to reconnect
newEventsChan, newErrChan := d.Client.Events(ctx, options)

Check warning on line 614 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L614

Added line #L614 was not covered by tests

ticker.Reset(d.CheckIntervalDuration)
// Check if connection is immediately broken
select {
case err := <-newErrChan:
// Connection failed immediately
retries++
errorRetryBackoff *= backoffFactor
if errorRetryBackoff > errorRetryMaxBackoff {
errorRetryBackoff = errorRetryMaxBackoff
}
d.logger.Errorf("Failed to reconnect to Docker events (attempt %d): %v",
retries, err)

Check warning on line 626 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L616-L626

Added lines #L616 - L626 were not covered by tests

default:
// No immediate error, seems to have reconnected successfully
errorRetryBackoff = initialBackoff
eventsChan = newEventsChan
errChan = newErrChan
goto reconnected

Check warning on line 633 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L628-L633

Added lines #L628 - L633 were not covered by tests
}
}
reconnected:
d.logger.Info("Successfully reconnected to Docker events")
// We check containers after a reconnection because the docker daemon might have restarted
// and the container tombs may have self deleted
if err := d.checkContainers(ctx, monitChan, deleteChan); err != nil {
d.logger.Warnf("Failed to check containers: %v", err)
}

Check warning on line 642 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L637-L642

Added lines #L637 - L642 were not covered by tests
// Continue in the main loop with new channels
continue

Check warning on line 644 in pkg/acquisition/modules/docker/docker.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/modules/docker/docker.go#L644

Added line #L644 was not covered by tests
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/acquisition/modules/docker/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

dockerTypes "github.com/docker/docker/api/types"
dockerContainer "github.com/docker/docker/api/types/container"
dockerTypesEvents "github.com/docker/docker/api/types/events"
"github.com/docker/docker/client"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -50,6 +51,15 @@ source: docker`,
config: `
mode: cat
source: docker
container_name:
- toto`,
expectedErr: "",
},
{
config: `
mode: cat
source: docker
check_interval: 10s
container_name:
- toto`,
expectedErr: "",
Expand Down Expand Up @@ -273,6 +283,14 @@ func (cli *mockDockerCli) ContainerInspect(ctx context.Context, c string) (docke
return r, nil
}

// Since we are mocking the docker client, we return channels that will never be used
func (cli *mockDockerCli) Events(ctx context.Context, options dockerTypesEvents.ListOptions) (<-chan dockerTypesEvents.Message, <-chan error) {
eventsChan := make(chan dockerTypesEvents.Message)
errChan := make(chan error)

return eventsChan, errChan
}

func TestOneShot(t *testing.T) {
ctx := t.Context()

Expand Down