From e58562636a66b54fa9b6d5e26ddfe58e5b58bdaf Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Fri, 28 Nov 2025 09:38:56 +0100 Subject: [PATCH] [cache-processor] Set beat paths (#47353) # Lazy Initialization of the Cache Processor's File Store ## The Problem The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process. But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this. The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later. ## The Solution My solution is to initialize the cache's file store lazily. Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction. ## How it Works The path info gets passed down when a client connects to the pipeline. Here's the flow: 1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`. 2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration. 3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`. 4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor. 5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path. ## Diagram ```mermaid graph TD subgraph "libbeat/processors/cache (init)" A["init()"] end subgraph "libbeat/processors" B["processors.RegisterPlugin"] C{"registry"} end A --> B; B -- "Save factory" --> C; subgraph "x-pack/filebeat/fbreceiver" D["createReceiver"] end subgraph "libbeat/processors" E["processors.New(config)"] C -. "Lookup 'cache'" .-> E; end D --> E; D --> I; E --> G; subgraph "libbeat/processors/cache" G["cache.New()"] -- store=nil --> H{"cache"}; end subgraph "x-pack/libbeat/cmd/instance" I["instance.NewBeatForReceiver"]; I --> J{"paths.Path object"}; end subgraph "libbeat/publisher/pipeline" J --> K["pipeline.New"]; K --> L["ConnectWith"]; end subgraph "libbeat/publisher/processing" L -- "Config w/ paths" --> N["builder.Create"]; N --> O["group.SetPaths"]; end subgraph "libbeat/processors/cache" O --> P["cache.SetPaths"]; P --> Q["sync.Once"]; Q -- "initialize store" --> H; end ``` ## Pros and Cons of This Approach * **Pros**: * It's a minimal, targeted change that solves the immediate problem. * It avoids a large-scale, breaking refactoring of all processors. * It maintains backward compatibility for existing processors and downstream consumers of `libbeat`. * **Cons**: * Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it. ## Alternatives Considered ### Option 1: Add a `paths` argument to all processor constructors * **Pros**: * Simple and direct. * **Cons**: * Requires a global refactoring of all processors. * Breaks external downstream libbeat importers like Cloudbeat. * The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose. ### Option 2: Refactor `processors` to introduce a "V2" interface * **Pros**: * Allows for a new, backwards-compatible signature (e.g., using a config struct). * This can still be done later. * We can support both V1 processors and gradually move processors to V2. * **Cons**: * Needs a significant refactoring effort. ## Checklist - [x] My code follows the style guidelines of this project - [x] I have commented my code, particularly in hard-to-understand areas - [ ] ~~I have made corresponding changes to the documentation~~ - [ ] ~~I have made corresponding change to the default configuration files~~ - [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability. - [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~ ## How to test this PR locally ### Configuration `filebeat-cache-mwe.yml`: ```yaml path.data: /tmp/data filebeat.inputs: - type: filestream id: filestream-input enabled: true paths: - /tmp/logs/*.log parsers: - ndjson: target: "" processors: # PUT: Store metadata when event.type is "source" - if: equals: event.type: "source" then: - cache: backend: file: id: test_cache write_interval: 5s put: key_field: event.id value_field: event.metadata ttl: 1h # GET: Retrieve metadata when event.type is "target" - if: equals: event.type: "target" then: - cache: backend: file: id: test_cache get: key_field: event.id target_field: cached_metadata output.console: enabled: true ``` ### Setup ```bash # Create directory #rm -rf /tmp/data /tmp/logs mkdir -p /tmp/logs # Create test data cat > /tmp/logs/test.log <<'EOF' {"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"} {"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"} {"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"} {"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"} {"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"} {"event":{"type":"target","id":"001"},"message":"target event 1"} {"event":{"type":"target","id":"002"},"message":"target event 2"} {"event":{"type":"target","id":"003"},"message":"target event 3"} {"event":{"type":"target","id":"004"},"message":"target event 4"} {"event":{"type":"target","id":"005"},"message":"target event 5"} EOF # Run filebeat ./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml ``` ### Expected Output Target events should have `cached_metadata` field populated: ```json { "event": { "type": "target", "id": "001" }, "message": "target event 1", "cached_metadata": { "user": "user-1", "role": "admin", "sequence": 1, "data": { "ip": "192.168.1.1", "session": "session-001" } } } ``` ### Cache Files After running filebeat, check cache files: ```bash cat /tmp/data/cache_processor/test_cache ``` example: ```json {"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"} {"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"} {"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"} {"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"} {"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"} ``` ## Related issues - Closes https://github.com/elastic/beats/issues/46985 (cherry picked from commit 28222c4cd9547cd93fef5bb49ba65ed9b4d66ba2) --- libbeat/cmd/instance/beat.go | 1 + libbeat/processors/cache/cache.go | 46 +++-- libbeat/processors/cache/cache_test.go | 57 +++++- libbeat/processors/cache/file_store.go | 12 +- libbeat/processors/conditionals.go | 32 ++- libbeat/processors/conditionals_test.go | 84 +++++++- libbeat/processors/processor.go | 7 + libbeat/processors/safe_processor.go | 109 ++++++++-- libbeat/processors/safe_processor_test.go | 202 +++++++++++++++++-- libbeat/publisher/pipeline/client_test.go | 4 +- libbeat/publisher/pipeline/client_worker.go | 4 +- libbeat/publisher/pipeline/pipeline.go | 10 +- libbeat/publisher/processing/default.go | 14 +- libbeat/publisher/processing/default_test.go | 25 ++- libbeat/publisher/processing/processing.go | 3 +- libbeat/publisher/processing/processors.go | 12 ++ x-pack/libbeat/cmd/instance/beat.go | 1 + 17 files changed, 548 insertions(+), 75 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 28311ac082b4..0cf507c402fa 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -387,6 +387,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { WaitClose: time.Second, Processors: b.processors, InputQueueSize: b.InputQueueSize, + Paths: b.Paths, } publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) if err != nil { diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index 603ea6be1ece..8d6f8388207f 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -69,38 +69,31 @@ func New(cfg *conf.C, log *logp.Logger) (beat.Processor, error) { // Logging (each processor instance has a unique ID). id := int(instanceID.Add(1)) log = log.Named(name).With("instance_id", id) + log.Infow("cache processor created", "config", config) - src, cancel, err := getStoreFor(config, log) - if err != nil { - return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) - } - - p := &cache{ + return &cache{ config: config, - store: src, - cancel: cancel, + store: nil, // initialized in SetPaths log: log, - } - p.log.Infow("initialized cache processor", "details", p) - return p, nil + }, nil } // getStoreFor returns a backing store for the provided configuration, // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. -func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { +func getStoreFor(cfg config, log *logp.Logger, path *paths.Path) (Store, context.CancelFunc, error) { switch { case cfg.Store.Memory != nil: s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: - err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + err := os.MkdirAll(path.Resolve(paths.Data, "cache_processor"), 0o700) if err != nil { return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) } - s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log, path) return s, cancel, nil default: @@ -131,8 +124,12 @@ type CacheEntry struct { index int } -// Run enriches the given event with the host metadata. +// Run enriches the given event with cached metadata. func (p *cache) Run(event *beat.Event) (*beat.Event, error) { + if p.store == nil { + return event, fmt.Errorf("cache processor store not initialized") + } + switch { case p.config.Put != nil: p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put) @@ -182,6 +179,21 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) { } } +// SetPaths initializes the cache store with the provided paths configuration. +// This method must be called before the processor can be used. +func (p *cache) SetPaths(path *paths.Path) error { + src, cancel, err := getStoreFor(p.config, p.log, path) + if err != nil { + return fmt.Errorf("cache processor could not create store for %s: %w", name, err) + } + + p.store = src + p.cancel = cancel + + p.log.Infow("initialized cache processor", "details", p) + return nil +} + // putFrom takes the configured value from the event and stores it in the cache // if it exists. func (p *cache) putFrom(event *beat.Event) error { @@ -268,7 +280,9 @@ func (p *cache) deleteFor(event *beat.Event) error { } func (p *cache) Close() error { - p.cancel() + if p.cancel != nil { + p.cancel() + } return nil } diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index f8297a52108a..615e0afe739b 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -19,6 +19,7 @@ package cache import ( "errors" + "strings" "testing" "github.com/google/go-cmp/cmp" @@ -29,6 +30,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) type cacheTestStep struct { @@ -583,7 +585,19 @@ func TestCache(t *testing.T) { t.Log(p) c, ok := p.(*cache) if !ok { - t.Fatalf("processor %d is not an *cache", i) + t.Fatalf("processor %d is not a *cache", i) + } + + // Initialize the store with paths + tmpDir := t.TempDir() + err = c.SetPaths(&paths.Path{ + Home: tmpDir, + Config: tmpDir, + Data: tmpDir, + Logs: tmpDir, + }) + if err != nil { + t.Errorf("unexpected error from SetPaths: %v", err) } defer func() { @@ -624,3 +638,44 @@ func TestCache(t *testing.T) { }) } } + +func TestSetPathsUninitialized(t *testing.T) { + cfg, err := conf.NewConfigFrom(mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "test", + }, + }, + "get": mapstr.M{ + "key_field": "key", + "target_field": "target", + }, + }) + if err != nil { + t.Fatalf("unexpected error from NewConfigFrom: %v", err) + } + + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + if err != nil { + t.Fatalf("unexpected error from New: %v", err) + } + + c, ok := p.(*cache) + if !ok { + t.Fatal("processor is not a *cache") + } + defer func() { + if err := c.Close(); err != nil { + t.Errorf("unexpected error from c.Close(): %v", err) + } + }() + + // Try to use without SetPaths - should fail + event, err := c.Run(&beat.Event{}) + if event == nil { + t.Error("expected non-nil event") + } + if err == nil || !strings.Contains(err.Error(), "cache processor store not initialized") { + t.Fatalf("expected error containing 'cache processor store not initialized', got: %v", err) + } +} diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 884a80a2d1d5..ebb1e4c23343 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -47,12 +47,12 @@ type fileStoreSet struct { // and its reference count is increased. The returned context.CancelFunc // reduces the reference count and deletes the fileStore from the set if the // count reaches zero. -func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger, path *paths.Path) (*fileStore, context.CancelFunc) { s.mu.Lock() defer s.mu.Unlock() store, ok := s.stores[id] if !ok { - store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + store = newFileStore(cfg, id, pathFromConfig(cfg, log, path), log) s.stores[store.id] = store } store.add(cfg) @@ -63,10 +63,10 @@ func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, } // pathFromConfig returns the mapping form a config to a file-system path. -func pathFromConfig(cfg config, log *logp.Logger) string { - path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) - log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) - return path +func pathFromConfig(cfg config, log *logp.Logger, path *paths.Path) string { + resultPath := filepath.Join(path.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", resultPath) + return resultPath } // cleanFilename replaces illegal printable characters (and space or dot) in diff --git a/libbeat/processors/conditionals.go b/libbeat/processors/conditionals.go index bb06019e20f8..6efd7db61b20 100644 --- a/libbeat/processors/conditionals.go +++ b/libbeat/processors/conditionals.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/conditions" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) // NewConditional returns a constructor suitable for registering when conditionals as a plugin. @@ -77,6 +78,14 @@ func (r *WhenProcessor) Run(event *beat.Event) (*beat.Event, error) { return r.p.Run(event) } +func (r *WhenProcessor) SetPaths(paths *paths.Path) error { + pathSetter, ok := r.p.(PathSetter) + if ok { + return pathSetter.SetPaths(paths) + } + return nil +} + func (r *WhenProcessor) String() string { return fmt.Sprintf("%v, condition=%v", r.p.String(), r.condition.String()) } @@ -86,7 +95,7 @@ func (r *WhenProcessor) String() string { // processors, one with `Close` and one without. The decision of // which to return is determined if the underlying processors require // `Close`. This is useful because some places in the code base -// (eg. javascript processors) require stateless processors (no Close +// (e.g. javascript processors) require stateless processors (no Close // method). type ClosingWhenProcessor struct { WhenProcessor @@ -199,6 +208,27 @@ func (p *IfThenElseProcessor) Run(event *beat.Event) (*beat.Event, error) { return event, nil } +func (p *IfThenElseProcessor) SetPaths(paths *paths.Path) error { + var err error + for _, proc := range p.then.List { + if procWithSet, ok := proc.(PathSetter); ok { + err = errors.Join(err, procWithSet.SetPaths(paths)) + } + } + + if p.els == nil { + return err + } + + for _, proc := range p.els.List { + if procWithSet, ok := proc.(PathSetter); ok { + err = errors.Join(err, procWithSet.SetPaths(paths)) + } + } + + return err +} + func (p *IfThenElseProcessor) String() string { var sb strings.Builder sb.WriteString("if ") diff --git a/libbeat/processors/conditionals_test.go b/libbeat/processors/conditionals_test.go index eda47806920b..ba9e1050f0e2 100644 --- a/libbeat/processors/conditionals_test.go +++ b/libbeat/processors/conditionals_test.go @@ -26,6 +26,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/paths" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/conditions" conf "github.com/elastic/elastic-agent-libs/config" @@ -248,6 +250,18 @@ func (c *errorProcessor) Close() error { return ErrProcessorClose } +var ErrSetPathsProcessor = fmt.Errorf("error processor set paths error") + +type setPathsProcessor struct{} + +func (c *setPathsProcessor) Run(e *beat.Event) (*beat.Event, error) { + return e, nil +} +func (c *setPathsProcessor) String() string { return "error_processor" } +func (c *setPathsProcessor) SetPaths(p *paths.Path) error { + return fmt.Errorf("error_processor set paths error: %s", p) +} + func TestConditionRuleClose(t *testing.T) { const whenCondition = ` contains.a: b @@ -289,16 +303,73 @@ then: beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, "")) require.NoError(t, err) + requireImplements[Closer](t, beatProcessor) // Verify we got a ClosingIfThenElseProcessor closingProc := requireAs[*ClosingIfThenElseProcessor](t, beatProcessor) assert.Nil(t, closingProc.els, "els should be nil when no else clause is provided") - assert.Implements(t, (*Closer)(nil), beatProcessor) err = closingProc.Close() require.NoError(t, err) } +func TestIfThenElseProcessorSetPaths(t *testing.T) { + logger := logptest.NewTestingLogger(t, "") + thenProcessors := &Processors{ + List: []beat.Processor{&setPathsProcessor{}}, + log: logger, + } + elsProcessors := &Processors{ + List: []beat.Processor{&setPathsProcessor{}}, + log: logger, + } + proc := &IfThenElseProcessor{ + cond: nil, + then: thenProcessors, + els: elsProcessors, + } + + // SetPaths should not panic when then is nil + tmpDir := t.TempDir() + beatPaths := &paths.Path{ + Home: tmpDir, + Config: tmpDir, + Data: tmpDir, + Logs: tmpDir, + } + err := proc.SetPaths(beatPaths) + require.ErrorAs(t, err, &ErrSetPathsProcessor) + require.ErrorContains(t, err, ErrSetPathsProcessor.Error()) + require.ErrorContains(t, err, beatPaths.String()) +} + +func TestIfThenElseProcessorSetPathsNil(t *testing.T) { + const cfg = ` +if: + equals.test: value +then: + - add_fields: {target: "", fields: {test_field: test_value}} +` + c, err := conf.NewConfigWithYAML([]byte(cfg), "if-then config") + require.NoError(t, err) + + beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + proc := requireImplements[PathSetter](t, beatProcessor) + + // SetPaths should not panic when then is nil + tmpDir := t.TempDir() + beatPaths := &paths.Path{ + Home: tmpDir, + Config: tmpDir, + Data: tmpDir, + Logs: tmpDir, + } + err = proc.SetPaths(beatPaths) + require.NoError(t, err) +} + // requireAs performs a type assertion and requires it to succeed. func requireAs[T any](t *testing.T, v any) T { t.Helper() @@ -310,3 +381,14 @@ func requireAs[T any](t *testing.T, v any) T { return result } + +func requireImplements[T any](t *testing.T, v any) T { + t.Helper() + expected := (*T)(nil) + require.Implements(t, expected, v) + + result, ok := v.(T) + require.True(t, ok, "sanity check: expected %T, got %T", expected, v) + + return result +} diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index b06c60a82da3..d534754f3f12 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) const logName = "processors" @@ -45,6 +46,12 @@ type Closer interface { Close() error } +// PathSetter is an interface for processors that support lazy initialization +// with beat-specific paths. This method must be called before the processor can be used. +type PathSetter interface { + SetPaths(*paths.Path) error +} + // Close closes a processor if it implements the Closer interface func Close(p beat.Processor) error { if closer, ok := p.(Closer); ok { diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index 2f31ad287ef8..a8107a08189c 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -19,62 +19,129 @@ package processors import ( "errors" - "sync/atomic" + "fmt" + "sync" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) -var ErrClosed = errors.New("attempt to use a closed processor") +var ( + ErrClosed = errors.New("attempt to use a closed processor") + ErrPathsNotSet = errors.New("attempt to run processor before SetPaths was called") + ErrPathsAlreadySet = errors.New("attempt to set paths twice") + ErrSetPathsOnClosed = errors.New("attempt to set paths on closed processor") +) + +type state = int + +const ( + stateInit state = iota + stateSetPaths + stateClosed +) +// SafeProcessor wraps a beat.Processor to provide thread-safe state management. +// It ensures SetPaths is called only once and prevents Run after Close. +// Use safeProcessorWithClose for processors that also implement Closer. type SafeProcessor struct { beat.Processor - closed uint32 + + mu sync.RWMutex + state state +} + +// safeProcessorWithClose extends SafeProcessor to also handle Close. +// It ensures Close is called only once on the underlying processor. +type safeProcessorWithClose struct { + SafeProcessor } -// Run allows to run processor only when `Close` was not called prior +// Run delegates to the underlying processor. Returns ErrClosed if the processor +// has been closed, or ErrPathsNotSet if the processor implements PathSetter but +// SetPaths has not been called. func (p *SafeProcessor) Run(event *beat.Event) (*beat.Event, error) { - if atomic.LoadUint32(&p.closed) == 1 { + p.mu.RLock() + defer p.mu.RUnlock() + switch p.state { + case stateClosed: return nil, ErrClosed + case stateInit: + if _, ok := p.Processor.(PathSetter); ok { + return nil, ErrPathsNotSet + } + default: // proceed } return p.Processor.Run(event) } // Close makes sure the underlying `Close` function is called only once. -func (p *SafeProcessor) Close() (err error) { - if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { +func (p *safeProcessorWithClose) Close() (err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.state != stateClosed { + p.state = stateClosed return Close(p.Processor) } - logp.L().Warnf("tried to close already closed %q processor", p.Processor.String()) + logp.L().Warnf("tried to close already closed %q processor", p.String()) return nil } -// SafeWrap makes sure that the processor handles all the required edge-cases. +// SetPaths delegates to the underlying processor if it implements PathSetter. +// Returns ErrPathsAlreadySet if called more than once, or ErrSetPathsOnClosed +// if the processor has been closed. +func (p *SafeProcessor) SetPaths(paths *paths.Path) error { + pathSetter, ok := p.Processor.(PathSetter) + if !ok { + return nil + } + p.mu.Lock() + defer p.mu.Unlock() + + switch p.state { + case stateInit: + p.state = stateSetPaths + return pathSetter.SetPaths(paths) + case stateSetPaths: + return ErrPathsAlreadySet + case stateClosed: + return ErrSetPathsOnClosed + } + return fmt.Errorf("unknown state: %d", p.state) +} + +// SafeWrap wraps a processor constructor to handle common edge cases: +// +// - Multiple Close calls: Each processor might end up in multiple processor +// groups. Every group has its own Close that calls Close on each processor, +// leading to multiple Close calls on the same processor. +// +// - Multiple SetPaths calls: The wrapper ensures SetPaths is called at most once. // -// Each processor might end up in multiple processor groups. -// Every group has its own `Close` that calls `Close` on each -// processor of that group which leads to multiple `Close` calls -// on the same processor. +// - Close before/during SetPaths: Prevents initialization after shutdown and +// protects against race conditions between SetPaths and Close. // -// If `SafeWrap` is not used, the processor must handle multiple -// `Close` calls by using `sync.Once` in its `Close` function. -// We make it easer for processor developers and take care of it -// in the processor registry instead. +// Without SafeWrap, processors must handle these cases manually using sync.Once +// or similar mechanisms. SafeWrap is automatically applied by RegisterPlugin. func SafeWrap(constructor Constructor) Constructor { return func(config *config.C, log *logp.Logger) (beat.Processor, error) { processor, err := constructor(config, log) if err != nil { return nil, err } - // if the processor does not implement `Closer` - // it does not need a wrap + // if the processor does not implement `Closer` it does not need a wrap if _, ok := processor.(Closer); !ok { + // if SetPaths is implemented, ensure single call of SetPaths + if _, ok = processor.(PathSetter); ok { + return &SafeProcessor{Processor: processor}, nil + } return processor, nil } - return &SafeProcessor{ - Processor: processor, + return &safeProcessorWithClose{ + SafeProcessor: SafeProcessor{Processor: processor}, }, nil } } diff --git a/libbeat/processors/safe_processor_test.go b/libbeat/processors/safe_processor_test.go index e04d933dbb67..83bdfed1f2d9 100644 --- a/libbeat/processors/safe_processor_test.go +++ b/libbeat/processors/safe_processor_test.go @@ -20,11 +20,13 @@ package processors import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) var mockEvent = &beat.Event{} @@ -60,6 +62,40 @@ func newMockCloserConstructor() (Constructor, *mockCloserProcessor) { return constructor, &p } +type mockPathSetterCloserProcessor struct { + mockCloserProcessor + setPathsCount int +} + +func (p *mockPathSetterCloserProcessor) SetPaths(*paths.Path) error { + p.setPathsCount++ + return nil +} + +func newMockPathSetterCloserProcessor() (Constructor, *mockPathSetterCloserProcessor) { + p := mockPathSetterCloserProcessor{} + constructor := func(config *config.C, _ *logp.Logger) (beat.Processor, error) { return &p, nil } + return constructor, &p +} + +type mockPathSetterProcessor struct { + mockProcessor + setPathsCount int +} + +func (p *mockPathSetterProcessor) SetPaths(*paths.Path) error { + p.setPathsCount++ + return nil +} + +func newMockPathSetterProcessor() (Constructor, *mockPathSetterProcessor) { + p := mockPathSetterProcessor{} + constructor := func(config *config.C, _ *logp.Logger) (beat.Processor, error) { + return &p, nil + } + return constructor, &p +} + func mockConstructor(config *config.C, log *logp.Logger) (beat.Processor, error) { return &mockProcessor{}, nil } @@ -74,7 +110,8 @@ func TestSafeWrap(t *testing.T) { wrappedNonCloser := SafeWrap(nonCloser) wp, err := wrappedNonCloser(nil, nil) require.NoError(t, err) - require.IsType(t, &mockProcessor{}, wp) + assert.IsType(t, &mockProcessor{}, wp) + assert.NotImplements(t, (*Closer)(nil), wp) }) t.Run("wraps a closer processor", func(t *testing.T) { @@ -82,7 +119,8 @@ func TestSafeWrap(t *testing.T) { wrappedCloser := SafeWrap(closer) wcp, err := wrappedCloser(nil, nil) require.NoError(t, err) - require.IsType(t, &SafeProcessor{}, wcp) + assert.IsType(t, &safeProcessorWithClose{}, wcp) + assert.Implements(t, (*Closer)(nil), wcp) }) } @@ -99,34 +137,166 @@ func TestSafeProcessor(t *testing.T) { }) t.Run("propagates Run to a processor", func(t *testing.T) { - require.Equal(t, 0, p.runCount) + assert.Equal(t, 0, p.runCount) e, err := sp.Run(nil) - require.NoError(t, err) - require.Equal(t, e, mockEvent) + assert.NoError(t, err) + assert.Equal(t, e, mockEvent) e, err = sp.Run(nil) - require.NoError(t, err) - require.Equal(t, e, mockEvent) + assert.NoError(t, err) + assert.Equal(t, e, mockEvent) - require.Equal(t, 2, p.runCount) + assert.Equal(t, 2, p.runCount) }) t.Run("propagates Close to a processor only once", func(t *testing.T) { - require.Equal(t, 0, p.closeCount) + assert.Equal(t, 0, p.closeCount) err := Close(sp) - require.NoError(t, err) + assert.NoError(t, err) err = Close(sp) - require.NoError(t, err) + assert.NoError(t, err) - require.Equal(t, 1, p.closeCount) + assert.Equal(t, 1, p.closeCount) }) t.Run("does not propagate Run when closed", func(t *testing.T) { - require.Equal(t, 2, p.runCount) // still 2 from the previous test case + assert.Equal(t, 2, p.runCount) // still 2 from the previous test case e, err := sp.Run(nil) - require.Nil(t, e) - require.ErrorIs(t, err, ErrClosed) - require.Equal(t, 2, p.runCount) + assert.Nil(t, e) + assert.ErrorIs(t, err, ErrClosed) + assert.Equal(t, 2, p.runCount) + }) +} + +func TestSafeProcessorSetPathsClose(t *testing.T) { + cons, p := newMockPathSetterCloserProcessor() + var ( + bp beat.Processor + sp PathSetter + err error + ) + t.Run("creates a wrapped processor", func(t *testing.T) { + sw := SafeWrap(cons) + bp, err = sw(nil, nil) + require.NoError(t, err) + assert.Equal(t, 0, p.setPathsCount) + }) + + t.Run("does not run before SetPaths is called", func(t *testing.T) { + assert.Equal(t, 0, p.runCount) + e, err := bp.Run(nil) + assert.Nil(t, e) + assert.ErrorIs(t, err, ErrPathsNotSet) + assert.Equal(t, 0, p.runCount) + }) + + t.Run("sets paths", func(t *testing.T) { + assert.Equal(t, 0, p.setPathsCount) + require.Implements(t, (*PathSetter)(nil), bp) + var ok bool + sp, ok = bp.(PathSetter) + require.True(t, ok) + require.NotNil(t, sp) + err = sp.SetPaths(&paths.Path{}) + assert.NoError(t, err) + assert.Equal(t, 1, p.setPathsCount) + + // set paths again + err = sp.SetPaths(&paths.Path{}) + assert.ErrorIs(t, err, ErrPathsAlreadySet) + assert.Equal(t, 1, p.setPathsCount) + }) + + t.Run("propagates Run to a processor", func(t *testing.T) { + assert.Equal(t, 0, p.runCount) + + e, err := bp.Run(nil) + assert.NoError(t, err) + assert.Equal(t, e, mockEvent) + e, err = bp.Run(nil) + assert.NoError(t, err) + assert.Equal(t, e, mockEvent) + + assert.Equal(t, 2, p.runCount) + }) + + t.Run("propagates Close to a processor only once", func(t *testing.T) { + assert.Equal(t, 0, p.closeCount) + + err := Close(bp) + assert.NoError(t, err) + err = Close(bp) + assert.NoError(t, err) + + assert.Equal(t, 1, p.closeCount) + }) + + t.Run("does not propagate Run when closed", func(t *testing.T) { + assert.Equal(t, 2, p.runCount) // still 2 from the previous test case + e, err := bp.Run(nil) + assert.Nil(t, e) + assert.ErrorIs(t, err, ErrClosed) + assert.Equal(t, 2, p.runCount) + }) + + t.Run("does not set paths when closed", func(t *testing.T) { + err = sp.SetPaths(&paths.Path{}) + assert.ErrorIs(t, err, ErrSetPathsOnClosed) + assert.Equal(t, 1, p.setPathsCount) + }) +} + +func TestSafeProcessorSetPaths(t *testing.T) { + cons, p := newMockPathSetterProcessor() + var ( + bp beat.Processor + sp PathSetter + err error + ) + t.Run("creates a wrapped processor", func(t *testing.T) { + sw := SafeWrap(cons) + bp, err = sw(nil, nil) + require.NoError(t, err) + assert.Equal(t, 0, p.setPathsCount) + }) + + t.Run("not a closer", func(t *testing.T) { + assert.NotImplements(t, (*Closer)(nil), p) + assert.NoError(t, Close(p)) + assert.NoError(t, Close(p)) + }) + + t.Run("does not run before SetPaths is called", func(t *testing.T) { + assert.Equal(t, 0, p.runCount) + e, err := bp.Run(nil) + assert.Nil(t, e) + assert.ErrorIs(t, err, ErrPathsNotSet) + assert.Equal(t, 0, p.runCount) + }) + + t.Run("sets paths", func(t *testing.T) { + assert.Equal(t, 0, p.setPathsCount) + require.Implements(t, (*PathSetter)(nil), bp) + var ok bool + sp, ok = bp.(PathSetter) + require.True(t, ok) + require.NotNil(t, sp) + err = sp.SetPaths(&paths.Path{}) + assert.NoError(t, err) + assert.Equal(t, 1, p.setPathsCount) + + // set paths again + err = sp.SetPaths(&paths.Path{}) + assert.ErrorIs(t, err, ErrPathsAlreadySet) + assert.Equal(t, 1, p.setPathsCount) + }) + + t.Run("runs after SetPaths is called", func(t *testing.T) { + assert.Equal(t, 0, p.runCount) + e, err := bp.Run(nil) + assert.NoError(t, err) + assert.Equal(t, e, mockEvent) + assert.Equal(t, 1, p.runCount) }) } diff --git a/libbeat/publisher/pipeline/client_test.go b/libbeat/publisher/pipeline/client_test.go index 41dfdb17d31f..8df18b1d7efb 100644 --- a/libbeat/publisher/pipeline/client_test.go +++ b/libbeat/publisher/pipeline/client_test.go @@ -28,6 +28,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/elastic-agent-libs/paths" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/processors" @@ -475,7 +477,7 @@ type testProcessorSupporter struct { } // Create a running processor interface based on the given config -func (p testProcessorSupporter) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { +func (p testProcessorSupporter) Create(cfg beat.ProcessingConfig, drop bool, paths *paths.Path) (beat.Processor, error) { return p.Processor, nil } diff --git a/libbeat/publisher/pipeline/client_worker.go b/libbeat/publisher/pipeline/client_worker.go index 3e6b8202dd21..41336b125be1 100644 --- a/libbeat/publisher/pipeline/client_worker.go +++ b/libbeat/publisher/pipeline/client_worker.go @@ -81,7 +81,7 @@ func (w *worker) close() { } func (w *clientWorker) Close() error { - w.worker.close() + w.close() return w.client.Close() } @@ -106,7 +106,7 @@ func (w *clientWorker) run(ctx context.Context) { } func (w *netClientWorker) Close() error { - w.worker.close() + w.close() return w.client.Close() } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index eee57251fecd..1a1ac3788ca7 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -35,6 +35,7 @@ import ( "github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) // Pipeline implementation providint all beats publisher functionality. @@ -71,6 +72,9 @@ type Pipeline struct { forceCloseQueue bool processors processing.Supporter + + // paths contains the paths configuration for processor initialization. + paths *paths.Path } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -84,6 +88,9 @@ type Settings struct { Processors processing.Supporter InputQueueSize int + + // Paths contains the paths configuration used for processor initialization. + Paths *paths.Path } // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. @@ -135,6 +142,7 @@ func New( observer: nilObserver, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, + paths: settings.Paths, } switch settings.WaitCloseMode { case WaitOnPipelineClose, WaitOnPipelineCloseThenForce: @@ -282,7 +290,7 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo if p.processors == nil { return nil, nil } - return p.processors.Create(cfg, noPublish) + return p.processors.Create(cfg, noPublish, p.paths) } // OutputReloader returns a reloadable object for the output section of this pipeline diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 70f406e575ab..f58688682a71 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) // builder is used to create the event processing pipeline in Beats. The @@ -286,7 +287,7 @@ func (b *builder) Processors() []string { // 9. (P) timeseries mangling // 10. (P) (if publish/debug enabled) log event // 11. (P) (if output disabled) dropEvent -func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) { +func (b *builder) Create(cfg beat.ProcessingConfig, drop bool, paths *paths.Path) (beat.Processor, error) { var ( // pipeline processors processors = newGroup("processPipeline", b.log) @@ -381,6 +382,12 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, // setup 8: pipeline processors list if b.processors != nil { + // function processor hides implementation of processors.PathSetter + err := b.processors.SetPaths(paths) + if err != nil { + return nil, fmt.Errorf("failed setting paths for global processors: %w", err) + } + // Add the global pipeline as a function processor, so clients cannot close it processors.add(newProcessor(b.processors.title, b.processors.Run)) } @@ -400,6 +407,11 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, processors.add(dropDisabledProcessor) } + err := processors.SetPaths(paths) + if err != nil { + return nil, fmt.Errorf("failed to set paths for processing pipeline: %w", err) + } + return processors, nil } diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 90bd190712f0..cf2d93b878aa 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -34,6 +34,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" _ "github.com/elastic/beats/v7/libbeat/processors/add_docker_metadata" @@ -287,7 +288,7 @@ func TestProcessorsConfigs(t *testing.T) { support, err := factory(info, logp.L(), cfg) require.NoError(t, err) - prog, err := support.Create(test.local, test.drop) + prog, err := support.Create(test.local, test.drop, tmpPaths(t)) require.NoError(t, err) actual, err := prog.Run(&beat.Event{ @@ -326,7 +327,7 @@ func TestEventNormalizationOverride(t *testing.T) { builder, err := newBuilder(beat.Info{}, logp.NewNopLogger(), nil, mapstr.EventMetadata{}, nil, tc.skipNormalize, false) require.NoError(t, err) - processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false) + processor, err := builder.Create(beat.ProcessingConfig{EventNormalization: tc.normalizeOverride}, false, tmpPaths(t)) require.NoError(t, err) group, ok := processor.(*group) require.True(t, ok) @@ -370,7 +371,7 @@ func TestNormalization(t *testing.T) { s, err := MakeDefaultSupport(test.normalize, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) - prog, err := s.Create(beat.ProcessingConfig{}, false) + prog, err := s.Create(beat.ProcessingConfig{}, false, tmpPaths(t)) require.NoError(t, err) fields := test.in.Clone() @@ -391,7 +392,7 @@ func BenchmarkNormalization(b *testing.B) { s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(b, err) - prog, err := s.Create(beat.ProcessingConfig{}, false) + prog, err := s.Create(beat.ProcessingConfig{}, false, tmpPaths(b)) require.NoError(b, err) fields := mapstr.M{"a": "b"} @@ -405,7 +406,7 @@ func TestAlwaysDrop(t *testing.T) { s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) - prog, err := s.Create(beat.ProcessingConfig{}, true) + prog, err := s.Create(beat.ProcessingConfig{}, true, tmpPaths(t)) require.NoError(t, err) actual, err := prog.Run(&beat.Event{}) @@ -423,7 +424,7 @@ func TestDynamicFields(t *testing.T) { dynFields := mapstr.NewPointer(mapstr.M{}) prog, err := factory.Create(beat.ProcessingConfig{ DynamicFields: &dynFields, - }, false) + }, false, tmpPaths(t)) require.NoError(t, err) actual, err := prog.Run(&beat.Event{Fields: mapstr.M{"hello": "world"}}) @@ -458,7 +459,7 @@ func TestProcessingClose(t *testing.T) { prog, err := factory.Create(beat.ProcessingConfig{ Processor: g, - }, false) + }, false, tmpPaths(t)) require.NoError(t, err) // Check that both processors are called @@ -518,3 +519,13 @@ func (p *processorWithClose) Close() error { func (p *processorWithClose) String() string { return "processorWithClose" } + +func tmpPaths(t testing.TB) *paths.Path { + dir := t.TempDir() + return &paths.Path{ + Home: dir, + Config: dir, + Data: dir, + Logs: dir, + } +} diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index e5d6be9581ba..f96e7d636f79 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) // SupportFactory creates a new processing Supporter that can be used with @@ -36,7 +37,7 @@ type SupportFactory func(info beat.Info, log *logp.Logger, cfg *config.C) (Suppo // A Supporter needs to be closed with `Close()` to release its global resources. type Supporter interface { // Create a running processor interface based on the given config - Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) + Create(cfg beat.ProcessingConfig, drop bool, paths *paths.Path) (beat.Processor, error) // Processors returns a list of config strings for the given processor, for debug purposes Processors() []string // Close the processor supporter diff --git a/libbeat/publisher/processing/processors.go b/libbeat/publisher/processing/processors.go index 766cd3cb4a63..64cbc63be73e 100644 --- a/libbeat/publisher/processing/processors.go +++ b/libbeat/publisher/processing/processors.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) type group struct { @@ -117,6 +118,17 @@ func (p *group) All() []beat.Processor { return p.list } +func (p *group) SetPaths(paths *paths.Path) error { + var err error + for _, processor := range p.list { + pathSetter, ok := processor.(processors.PathSetter) + if ok { + err = errors.Join(err, pathSetter.SetPaths(paths)) + } + } + return err +} + func (p *group) Run(event *beat.Event) (*beat.Event, error) { if p == nil || len(p.list) == 0 { return event, nil diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index 6f459e9ef5a2..30e970af0ce0 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -255,6 +255,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an InputQueueSize: b.InputQueueSize, WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce, WaitClose: receiverPublisherCloseTimeout, + Paths: b.Paths, } publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings) if err != nil {