Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: >-
[Filestream] ensure harvester always restarts if the file has not been fully ingested.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: filebeat

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/beats/pull/47107

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/beats/issues/46923
86 changes: 31 additions & 55 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
package filestream

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -43,18 +43,17 @@ import (
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/go-concert/unison"
)

type inputTestingEnvironment struct {
logger *logp.Logger
loggerBuffer *bytes.Buffer
t *testing.T
workingDir string
stateStore statestore.States
pipeline *mockPipelineConnector
monitoring beat.Monitoring
testLogger *logptest.Logger
t *testing.T
workingDir string
stateStore statestore.States
pipeline *mockPipelineConnector
monitoring beat.Monitoring

pluginInitOnce sync.Once
plugin v2.Plugin
Expand All @@ -71,40 +70,18 @@ type registryEntry struct {
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
// logp.NewInMemoryLocal will always use a console encoder, passing a
// JSONEncoderConfig will only change the keys, not the final encoding.
logger, buff := logp.NewInMemoryLocal("", logp.ConsoleEncoderConfig())

t.Cleanup(func() {
if t.Failed() {
pattern := strings.ReplaceAll(t.Name()+"-*", "/", "_")
f, err := os.CreateTemp("", pattern)
if err != nil {
t.Errorf("cannot create temp file for logs: %s", err)
return
}

defer f.Close()

data := buff.Bytes()
t.Logf("Debug Logs:%s\n", string(data))
t.Logf("Logs written to %s", f.Name())
if _, err := f.Write(data); err != nil {
t.Logf("could not write log file for debugging: %s", err)
}

return
}
})
logger := logptest.NewFileLogger(
t,
filepath.Join("..", "..", "build", "integration-tests"),
)

return &inputTestingEnvironment{
logger: logger,
loggerBuffer: buff,
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
monitoring: beat.NewMonitoring(),
testLogger: logger,
t: t,
workingDir: t.TempDir(),
stateStore: openTestStatestore(),
pipeline: &mockPipelineConnector{},
monitoring: beat.NewMonitoring(),
}
}

Expand Down Expand Up @@ -136,7 +113,7 @@ func (e *inputTestingEnvironment) createInput(config map[string]any) (v2.Input,

func (e *inputTestingEnvironment) getManager() v2.InputManager {
e.pluginInitOnce.Do(func() {
e.plugin = Plugin(e.logger, e.stateStore)
e.plugin = Plugin(e.testLogger.Logger, e.stateStore)
})
return e.plugin.Manager
}
Expand All @@ -147,7 +124,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, id string, inp
defer wg.Done()
defer func() { _ = grp.Stop() }()

logger, _ := logp.NewDevelopmentLogger("")
logger := e.testLogger.Named("metrics-registry")
reg := inputmon.NewMetricsRegistry(
id, inp.Name(), e.monitoring.InputsRegistry(), logger)
defer inputmon.CancelMetricsRegistry(
Expand All @@ -160,7 +137,7 @@ func (e *inputTestingEnvironment) startInput(ctx context.Context, id string, inp
Cancelation: ctx,
StatusReporter: nil,
MetricsRegistry: reg,
Logger: e.logger,
Logger: e.testLogger.Named("input.filestream"),
}
_ = inp.Run(inputCtx, e.pipeline)
}(&e.wg, &e.grp)
Expand Down Expand Up @@ -487,7 +464,7 @@ func (e *inputTestingEnvironment) waitUntilHarvesterIsDone() {
require.Eventually(
e.t,
func() bool {
return e.pipeline.clients[len(e.pipeline.clients)-1].closed
return e.pipeline.clients[len(e.pipeline.clients)-1].closed.Load()
},
time.Second*10,
time.Millisecond*10,
Expand Down Expand Up @@ -572,14 +549,13 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
// logContains ensures s is a sub string on any log line.
// If s is not found, the test fails
func (e *inputTestingEnvironment) logContains(s string) {
logs := e.loggerBuffer.String()
for _, line := range strings.Split(logs, "\n") {
if strings.Contains(line, s) {
return
}
}
e.t.Helper()
e.testLogger.LogContains(e.t, s)
}

e.t.Fatalf("%q not found in logs", s)
func (e *inputTestingEnvironment) WaitLogsContains(s string, timeout time.Duration, msgAndArgs ...any) {
e.t.Helper()
e.testLogger.WaitLogsContains(e.t, s, timeout, msgAndArgs...)
}

var _ statestore.States = (*testInputStore)(nil)
Expand Down Expand Up @@ -610,7 +586,7 @@ type mockClient struct {
publishing []beat.Event
published []beat.Event
ackHandler beat.EventListener
closed bool
closed atomic.Bool
mtx sync.Mutex
canceler context.CancelFunc
}
Expand Down Expand Up @@ -653,11 +629,11 @@ func (c *mockClient) Close() error {
c.mtx.Lock()
defer c.mtx.Unlock()

if c.closed {
if c.closed.Load() {
return fmt.Errorf("mock client already closed")
}

c.closed = true
c.closed.Store(true)
return nil
}

Expand Down
24 changes: 20 additions & 4 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"io"
"os"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -58,10 +59,15 @@ type logFile struct {

isInactive atomic.Bool

// offsetMutx is a mutex to ensure 'offset' and 'lastTimeRead' are
// atomically updated. Atomically updating them prevents issues
// detecting when the file is inactive by [shouldBeClosed].
offsetMutx sync.Mutex
offset int64
lastTimeRead time.Time
backoff backoff.Backoff
tg *unison.TaskGroup

backoff backoff.Backoff
tg *unison.TaskGroup
}

// newFileReader creates a new log instance to read log sources
Expand Down Expand Up @@ -112,8 +118,7 @@ func (f *logFile) Read(buf []byte) (int, error) {
for f.readerCtx.Err() == nil {
n, err := f.file.Read(buf)
if n > 0 {
f.offset += int64(n)
f.lastTimeRead = time.Now()
f.updateOffset(n)
}
totalN += n

Expand Down Expand Up @@ -196,11 +201,14 @@ func (f *logFile) periodicStateCheck(ctx unison.Canceler) {

func (f *logFile) shouldBeClosed() bool {
if f.closeInactive > 0 {
f.offsetMutx.Lock()
if time.Since(f.lastTimeRead) > f.closeInactive {
f.isInactive.Store(true)
f.log.Debugf("'%s' is inactive", f.file.Name())
f.offsetMutx.Unlock()
return true
}
f.offsetMutx.Unlock()
}

if !f.closeRemoved && !f.closeRenamed {
Expand Down Expand Up @@ -297,3 +305,11 @@ func (f *logFile) Close() error {
f.log.Debugf("Closed reader. Path='%s'", f.file.Name())
return err
}

// updateOffset updates the offset and lastTimeRead atomically
func (f *logFile) updateOffset(delta int) {
f.offsetMutx.Lock()
f.offset += int64(delta)
f.lastTimeRead = time.Now()
f.offsetMutx.Unlock()
}
Loading
Loading