Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
b663e4e
[cache-processor] WIP: SetPaths draft
orestisfl Oct 27, 2025
f0e656d
Merge branch 'main' into cache-processor-set-paths
orestisfl Oct 28, 2025
31ac009
SetPaths return error
orestisfl Nov 4, 2025
9c5f297
Merge remote-tracking branch 'upstream/main' into cache-processor-set…
orestisfl Nov 4, 2025
4fca1d1
tests
orestisfl Nov 4, 2025
3cf0287
cleanup
orestisfl Nov 4, 2025
745239b
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 5, 2025
fe554b6
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 5, 2025
b8d5464
Fix panic
orestisfl Nov 7, 2025
d6cc345
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 7, 2025
c43bbf5
lint
orestisfl Nov 7, 2025
8c22c4a
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 10, 2025
9a1baac
Use requireAs - requireImplements
orestisfl Nov 10, 2025
522f617
Merge remote-tracking branch 'upstream/main' into cache-processor-set…
orestisfl Nov 17, 2025
fbe2dae
Add in createBeater
orestisfl Nov 17, 2025
54f7304
Make paths argument in Supporter.Create
orestisfl Nov 17, 2025
ee0a4e4
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 17, 2025
f530e1f
format
orestisfl Nov 17, 2025
cb937e7
Fix global processors
orestisfl Nov 17, 2025
2491a46
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 18, 2025
27299ee
Remove use of testify
orestisfl Nov 21, 2025
81508ae
Simplify cache processor error message
orestisfl Nov 21, 2025
3dfd793
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 21, 2025
6917153
typo
orestisfl Nov 21, 2025
36d95c3
downgrade one error
orestisfl Nov 21, 2025
c97ef0e
safe-wrap SetPaths
orestisfl Nov 25, 2025
b336550
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 25, 2025
91d1850
comments
orestisfl Nov 26, 2025
59ddb28
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 26, 2025
3393934
detect paths not set before run
orestisfl Nov 26, 2025
378d6e6
default case
orestisfl Nov 26, 2025
8584724
rename to PathSetter
orestisfl Nov 27, 2025
7243b74
add log
orestisfl Nov 27, 2025
5009018
Merge branch 'main' into cache-processor-set-paths
orestisfl Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
WaitClose: time.Second,
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
Paths: b.Paths,
}
publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
if err != nil {
Expand Down
46 changes: 30 additions & 16 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,38 +69,31 @@ func New(cfg *conf.C, log *logp.Logger) (beat.Processor, error) {
// Logging (each processor instance has a unique ID).
id := int(instanceID.Add(1))
log = log.Named(name).With("instance_id", id)
log.Infow("cache processor created", "config", config)

src, cancel, err := getStoreFor(config, log)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

p := &cache{
return &cache{
config: config,
store: src,
cancel: cancel,
store: nil, // initialized in SetPaths
log: log,
}
p.log.Infow("initialized cache processor", "details", p)
return p, nil
}, nil
}

// getStoreFor returns a backing store for the provided configuration,
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
func getStoreFor(cfg config, log *logp.Logger, path *paths.Path) (Store, context.CancelFunc, error) {
switch {
case cfg.Store.Memory != nil:
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
err := os.MkdirAll(path.Resolve(paths.Data, "cache_processor"), 0o700)
if err != nil {
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
}
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log, path)
return s, cancel, nil

default:
Expand Down Expand Up @@ -131,8 +124,12 @@ type CacheEntry struct {
index int
}

// Run enriches the given event with the host metadata.
// Run enriches the given event with cached metadata.
func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
if p.store == nil {
return event, fmt.Errorf("cache processor store not initialized")
}

switch {
case p.config.Put != nil:
p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put)
Expand Down Expand Up @@ -186,6 +183,21 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
}
}

// SetPaths initializes the cache store with the provided paths configuration.
// This method must be called before the processor can be used.
func (p *cache) SetPaths(path *paths.Path) error {
src, cancel, err := getStoreFor(p.config, p.log, path)
if err != nil {
return fmt.Errorf("cache processor could not create store for %s: %w", name, err)
}

p.store = src
p.cancel = cancel

p.log.Infow("initialized cache processor", "details", p)
return nil
}

// putFrom takes the configured value from the event and stores it in the cache
// if it exists.
func (p *cache) putFrom(event *beat.Event) (err error) {
Expand Down Expand Up @@ -302,7 +314,9 @@ func (p *cache) deleteFor(event *beat.Event) (err error) {
}

func (p *cache) Close() error {
p.cancel()
if p.cancel != nil {
p.cancel()
}
return nil
}

Expand Down
57 changes: 56 additions & 1 deletion libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cache

import (
"errors"
"strings"
"testing"

"github.com/google/go-cmp/cmp"
Expand All @@ -28,6 +29,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp/logptest"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

type cacheTestStep struct {
Expand Down Expand Up @@ -662,7 +664,19 @@ func TestCache(t *testing.T) {
t.Log(p)
c, ok := p.(*cache)
if !ok {
t.Fatalf("processor %d is not an *cache", i)
t.Fatalf("processor %d is not a *cache", i)
}

// Initialize the store with paths
tmpDir := t.TempDir()
err = c.SetPaths(&paths.Path{
Home: tmpDir,
Config: tmpDir,
Data: tmpDir,
Logs: tmpDir,
})
if err != nil {
t.Errorf("unexpected error from SetPaths: %v", err)
}

defer func() {
Expand Down Expand Up @@ -702,3 +716,44 @@ func TestCache(t *testing.T) {
})
}
}

func TestSetPathsUninitialized(t *testing.T) {
cfg, err := conf.NewConfigFrom(mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "test",
},
},
"get": mapstr.M{
"key_field": "key",
"target_field": "target",
},
})
if err != nil {
t.Fatalf("unexpected error from NewConfigFrom: %v", err)
}

p, err := New(cfg, logptest.NewTestingLogger(t, ""))
if err != nil {
t.Fatalf("unexpected error from New: %v", err)
}

c, ok := p.(*cache)
if !ok {
t.Fatal("processor is not a *cache")
}
defer func() {
if err := c.Close(); err != nil {
t.Errorf("unexpected error from c.Close(): %v", err)
}
}()

// Try to use without SetPaths - should fail
event, err := c.Run(&beat.Event{})
if event == nil {
t.Error("expected non-nil event")
}
if err == nil || !strings.Contains(err.Error(), "cache processor store not initialized") {
t.Fatalf("expected error containing 'cache processor store not initialized', got: %v", err)
}
}
12 changes: 6 additions & 6 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ type fileStoreSet struct {
// and its reference count is increased. The returned context.CancelFunc
// reduces the reference count and deletes the fileStore from the set if the
// count reaches zero.
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) {
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger, path *paths.Path) (*fileStore, context.CancelFunc) {
s.mu.Lock()
defer s.mu.Unlock()
store, ok := s.stores[id]
if !ok {
store = newFileStore(cfg, id, pathFromConfig(cfg, log), log)
store = newFileStore(cfg, id, pathFromConfig(cfg, log, path), log)
s.stores[store.id] = store
}
store.add(cfg)
Expand All @@ -63,10 +63,10 @@ func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore,
}

// pathFromConfig returns the mapping form a config to a file-system path.
func pathFromConfig(cfg config, log *logp.Logger) string {
path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path)
return path
func pathFromConfig(cfg config, log *logp.Logger, path *paths.Path) string {
resultPath := filepath.Join(path.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", resultPath)
return resultPath
}

// cleanFilename replaces illegal printable characters (and space or dot) in
Expand Down
32 changes: 31 additions & 1 deletion libbeat/processors/conditionals.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/v7/libbeat/conditions"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

// NewConditional returns a constructor suitable for registering when conditionals as a plugin.
Expand Down Expand Up @@ -77,6 +78,14 @@ func (r *WhenProcessor) Run(event *beat.Event) (*beat.Event, error) {
return r.p.Run(event)
}

func (r *WhenProcessor) SetPaths(paths *paths.Path) error {
pathSetter, ok := r.p.(PathSetter)
if ok {
return pathSetter.SetPaths(paths)
}
return nil
}

func (r *WhenProcessor) String() string {
return fmt.Sprintf("%v, condition=%v", r.p.String(), r.condition.String())
}
Expand All @@ -86,7 +95,7 @@ func (r *WhenProcessor) String() string {
// processors, one with `Close` and one without. The decision of
// which to return is determined if the underlying processors require
// `Close`. This is useful because some places in the code base
// (eg. javascript processors) require stateless processors (no Close
// (e.g. javascript processors) require stateless processors (no Close
// method).
type ClosingWhenProcessor struct {
WhenProcessor
Expand Down Expand Up @@ -199,6 +208,27 @@ func (p *IfThenElseProcessor) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (p *IfThenElseProcessor) SetPaths(paths *paths.Path) error {
var err error
for _, proc := range p.then.List {
if procWithSet, ok := proc.(PathSetter); ok {
err = errors.Join(err, procWithSet.SetPaths(paths))
}
}

if p.els == nil {
return err
}

for _, proc := range p.els.List {
if procWithSet, ok := proc.(PathSetter); ok {
err = errors.Join(err, procWithSet.SetPaths(paths))
}
}

return err
}

func (p *IfThenElseProcessor) String() string {
var sb strings.Builder
sb.WriteString("if ")
Expand Down
84 changes: 83 additions & 1 deletion libbeat/processors/conditionals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/paths"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/conditions"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -248,6 +250,18 @@ func (c *errorProcessor) Close() error {
return ErrProcessorClose
}

var ErrSetPathsProcessor = fmt.Errorf("error processor set paths error")

type setPathsProcessor struct{}

func (c *setPathsProcessor) Run(e *beat.Event) (*beat.Event, error) {
return e, nil
}
func (c *setPathsProcessor) String() string { return "error_processor" }
func (c *setPathsProcessor) SetPaths(p *paths.Path) error {
return fmt.Errorf("error_processor set paths error: %s", p)
}

func TestConditionRuleClose(t *testing.T) {
const whenCondition = `
contains.a: b
Expand Down Expand Up @@ -289,16 +303,73 @@ then:

beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)
requireImplements[Closer](t, beatProcessor)

// Verify we got a ClosingIfThenElseProcessor
closingProc := requireAs[*ClosingIfThenElseProcessor](t, beatProcessor)
assert.Nil(t, closingProc.els, "els should be nil when no else clause is provided")
assert.Implements(t, (*Closer)(nil), beatProcessor)

err = closingProc.Close()
require.NoError(t, err)
}

func TestIfThenElseProcessorSetPaths(t *testing.T) {
logger := logptest.NewTestingLogger(t, "")
thenProcessors := &Processors{
List: []beat.Processor{&setPathsProcessor{}},
log: logger,
}
elsProcessors := &Processors{
List: []beat.Processor{&setPathsProcessor{}},
log: logger,
}
proc := &IfThenElseProcessor{
cond: nil,
then: thenProcessors,
els: elsProcessors,
}

// SetPaths should not panic when then is nil
tmpDir := t.TempDir()
beatPaths := &paths.Path{
Home: tmpDir,
Config: tmpDir,
Data: tmpDir,
Logs: tmpDir,
}
err := proc.SetPaths(beatPaths)
require.ErrorAs(t, err, &ErrSetPathsProcessor)
require.ErrorContains(t, err, ErrSetPathsProcessor.Error())
require.ErrorContains(t, err, beatPaths.String())
}

func TestIfThenElseProcessorSetPathsNil(t *testing.T) {
const cfg = `
if:
equals.test: value
then:
- add_fields: {target: "", fields: {test_field: test_value}}
`
c, err := conf.NewConfigWithYAML([]byte(cfg), "if-then config")
require.NoError(t, err)

beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, ""))
require.NoError(t, err)

proc := requireImplements[PathSetter](t, beatProcessor)

// SetPaths should not panic when then is nil
tmpDir := t.TempDir()
beatPaths := &paths.Path{
Home: tmpDir,
Config: tmpDir,
Data: tmpDir,
Logs: tmpDir,
}
err = proc.SetPaths(beatPaths)
require.NoError(t, err)
}

// requireAs performs a type assertion and requires it to succeed.
func requireAs[T any](t *testing.T, v any) T {
t.Helper()
Expand All @@ -310,3 +381,14 @@ func requireAs[T any](t *testing.T, v any) T {

return result
}

func requireImplements[T any](t *testing.T, v any) T {
t.Helper()
expected := (*T)(nil)
require.Implements(t, expected, v)

result, ok := v.(T)
require.True(t, ok, "sanity check: expected %T, got %T", expected, v)

return result
}
Loading