Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b663e4e
[cache-processor] WIP: SetPaths draft
orestisfl Oct 27, 2025
f0e656d
Merge branch 'main' into cache-processor-set-paths
orestisfl Oct 28, 2025
31ac009
SetPaths return error
orestisfl Nov 4, 2025
9c5f297
Merge remote-tracking branch 'upstream/main' into cache-processor-set…
orestisfl Nov 4, 2025
4fca1d1
tests
orestisfl Nov 4, 2025
3cf0287
cleanup
orestisfl Nov 4, 2025
745239b
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 5, 2025
fe554b6
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 5, 2025
b8d5464
Fix panic
orestisfl Nov 7, 2025
d6cc345
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 7, 2025
c43bbf5
lint
orestisfl Nov 7, 2025
8c22c4a
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 10, 2025
9a1baac
Use requireAs - requireImplements
orestisfl Nov 10, 2025
522f617
Merge remote-tracking branch 'upstream/main' into cache-processor-set…
orestisfl Nov 17, 2025
fbe2dae
Add in createBeater
orestisfl Nov 17, 2025
54f7304
Make paths argument in Supporter.Create
orestisfl Nov 17, 2025
ee0a4e4
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 17, 2025
f530e1f
format
orestisfl Nov 17, 2025
cb937e7
Fix global processors
orestisfl Nov 17, 2025
2491a46
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 18, 2025
27299ee
Remove use of testify
orestisfl Nov 21, 2025
81508ae
Simplify cache processor error message
orestisfl Nov 21, 2025
3dfd793
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 21, 2025
6917153
typo
orestisfl Nov 21, 2025
36d95c3
downgrade one error
orestisfl Nov 21, 2025
c97ef0e
safe-wrap SetPaths
orestisfl Nov 25, 2025
b336550
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 25, 2025
91d1850
comments
orestisfl Nov 26, 2025
59ddb28
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 26, 2025
3393934
detect paths not set before run
orestisfl Nov 26, 2025
378d6e6
default case
orestisfl Nov 26, 2025
8584724
rename to PathSetter
orestisfl Nov 27, 2025
7243b74
add log
orestisfl Nov 27, 2025
5009018
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

// Pipeline provides access to libbeat event publishing by creating a Client
Expand Down Expand Up @@ -125,6 +126,9 @@ type ProcessingConfig struct {
// Private contains additional information to be passed to the processing
// pipeline builder.
Private interface{}

// Paths TODO
Paths *paths.Path
}

// ClientListener provides access to internal client events.
Expand Down
38 changes: 24 additions & 14 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"os"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -57,6 +58,7 @@ type cache struct {
store Store
cancel context.CancelFunc
log *logp.Logger
once sync.Once
}

// Resulting processor implements `Close()` to release the cache resources.
Expand All @@ -70,37 +72,28 @@ func New(cfg *conf.C, log *logp.Logger) (beat.Processor, error) {
id := int(instanceID.Add(1))
log = log.Named(name).With("instance_id", id)

src, cancel, err := getStoreFor(config, log)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

p := &cache{
return &cache{
config: config,
store: src,
cancel: cancel,
log: log,
}
p.log.Infow("initialized cache processor", "details", p)
return p, nil
}, nil
}

// getStoreFor returns a backing store for the provided configuration,
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
func getStoreFor(cfg config, log *logp.Logger, path *paths.Path) (Store, context.CancelFunc, error) {
switch {
case cfg.Store.Memory != nil:
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
err := os.MkdirAll(path.Resolve(paths.Data, "cache_processor"), 0o700)
if err != nil {
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
}
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log, path)
return s, cancel, nil

default:
Expand Down Expand Up @@ -133,6 +126,8 @@ type CacheEntry struct {

// Run enriches the given event with the host metadata.
func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
p.SetPaths(paths.Paths) // set default if paths is not initialized

switch {
case p.config.Put != nil:
p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put)
Expand Down Expand Up @@ -182,6 +177,21 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
}
}

func (p *cache) SetPaths(path *paths.Path) {
p.once.Do(func() {
src, cancel, err := getStoreFor(p.config, p.log, path)
if err != nil {
p.log.Errorf("error getting store for %s: %v", name, err)
return
}

p.store = src
p.cancel = cancel

p.log.Infow("initialized cache processor", "details", p)
})
}

// putFrom takes the configured value from the event and stores it in the cache
// if it exists.
func (p *cache) putFrom(event *beat.Event) error {
Expand Down
12 changes: 6 additions & 6 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type fileStoreSet struct {
// and its reference count is increased. The returned context.CancelFunc
// reduces the reference count and deletes the fileStore from the set if the
// count reaches zero.
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) {
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger, path *paths.Path) (*fileStore, context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
store, ok := s.stores[id]
if !ok {
store = newFileStore(cfg, id, pathFromConfig(cfg, log), log)
store = newFileStore(cfg, id, pathFromConfig(cfg, log, path), log)
s.stores[store.id] = store
}
store.add(cfg)
Expand All @@ -63,10 +63,10 @@ func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore,
}

// pathFromConfig returns the mapping form a config to a file-system path.
func pathFromConfig(cfg config, log *logp.Logger) string {
path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path)
return path
func pathFromConfig(cfg config, log *logp.Logger, path *paths.Path) string {
resultPath := filepath.Join(path.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", resultPath)
return resultPath
}

// cleanFilename replaces illegal printable characters (and space or dot) in
Expand Down
8 changes: 8 additions & 0 deletions libbeat/processors/conditionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

// NewConditional returns a constructor suitable for registering when conditionals as a plugin.
Expand All @@ -48,6 +49,13 @@
p beat.Processor
}

func (p *WhenProcessor) SetPaths(paths *paths.Path) {
setPather, ok := p.p.(setPaths)
if ok {
setPather.SetPaths(paths)
}
}

// NewConditionRule returns a processor that will execute the provided processor if the condition is true.
func NewConditionRule(
c conditions.Config,
Expand All @@ -70,7 +78,7 @@
}

// Run executes this WhenProcessor.
func (r *WhenProcessor) Run(event *beat.Event) (*beat.Event, error) {

Check failure on line 81 in libbeat/processors/conditionals.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

ST1016: methods on the same type should have the same receiver name (seen 1x "p", 2x "r") (staticcheck)
if !(r.condition).Check(event) {
return event, nil
}
Expand Down
13 changes: 13 additions & 0 deletions libbeat/processors/safe_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

var ErrClosed = errors.New("attempt to use a closed processor")
Expand All @@ -46,10 +47,22 @@
if atomic.CompareAndSwapUint32(&p.closed, 0, 1) {
return Close(p.Processor)
}
logp.L().Warnf("tried to close already closed %q processor", p.Processor.String())

Check failure on line 50 in libbeat/processors/safe_processor.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

QF1008: could remove embedded field "Processor" from selector (staticcheck)
return nil
}

// TODO: common
type setPaths interface {
SetPaths(*paths.Path)
}

func (p *SafeProcessor) SetPaths(paths *paths.Path) {
setPather, ok := p.Processor.(setPaths)
if ok {
setPather.SetPaths(paths)
}
}

// SafeWrap makes sure that the processor handles all the required edge-cases.
//
// Each processor might end up in multiple processor groups.
Expand Down
9 changes: 9 additions & 0 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

// Pipeline implementation providint all beats publisher functionality.
Expand Down Expand Up @@ -71,6 +72,9 @@ type Pipeline struct {
forceCloseQueue bool

processors processing.Supporter

// paths TODO
paths *paths.Path
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand All @@ -84,6 +88,9 @@ type Settings struct {
Processors processing.Supporter

InputQueueSize int

// Paths TODO
Paths *paths.Path
}

// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
Expand Down Expand Up @@ -135,6 +142,7 @@ func New(
observer: nilObserver,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
paths: settings.Paths,
}
switch settings.WaitCloseMode {
case WaitOnPipelineClose, WaitOnPipelineCloseThenForce:
Expand Down Expand Up @@ -282,6 +290,7 @@ func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bo
if p.processors == nil {
return nil, nil
}
cfg.Paths = p.paths // TODO: ehm bad?
return p.processors.Create(cfg, noPublish)
}

Expand Down
1 change: 1 addition & 0 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ func (b *builder) Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor,
// setup 8: pipeline processors list
if b.processors != nil {
// Add the global pipeline as a function processor, so clients cannot close it
b.processors.SetPaths(cfg.Paths)
processors.add(newProcessor(b.processors.title, b.processors.Run))
}

Expand Down
15 changes: 15 additions & 0 deletions libbeat/publisher/processing/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

type group struct {
Expand Down Expand Up @@ -116,6 +117,20 @@ func (p *group) All() []beat.Processor {
return p.list
}

// TODO: move to processors
type setPaths interface {
SetPaths(*paths.Path)
}

func (p *group) SetPaths(paths *paths.Path) {
for _, processor := range p.list {
setPather, ok := processor.(setPaths)
if ok {
setPather.SetPaths(paths)
}
}
}

func (p *group) Run(event *beat.Event) (*beat.Event, error) {
if p == nil || len(p.list) == 0 {
return event, nil
Expand Down
1 change: 1 addition & 0 deletions x-pack/libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an
InputQueueSize: b.InputQueueSize,
WaitCloseMode: pipeline.WaitOnPipelineCloseThenForce,
WaitClose: receiverPublisherCloseTimeout,
Paths: b.Paths,
}
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, pipelineSettings)
if err != nil {
Expand Down
Loading