From 4133bf24d08afaddb40156d382fec1449b2d1627 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Tue, 2 Dec 2025 18:28:20 +0100 Subject: [PATCH 1/3] javascript processor: replace global paths with per-beat paths Changes: - Add SetPaths(path *paths.Path) method to jsProcessor that accepts a per-beat paths configuration - Defer file-based source initialization until SetPaths is called - For inline sources, initialization still happens immediately in NewFromConfig - loadSources now takes a *paths.Path parameter and uses pathConfig.Resolve() instead of the global paths.Resolve() Fixes https://github.com/elastic/beats/issues/46988 --- .../script/javascript/javascript.go | 92 ++++++++----- .../script/javascript/javascript_test.go | 124 ++++++++++++++++-- 2 files changed, 177 insertions(+), 39 deletions(-) diff --git a/libbeat/processors/script/javascript/javascript.go b/libbeat/processors/script/javascript/javascript.go index 4519e41d6d64..eb077e3763d9 100644 --- a/libbeat/processors/script/javascript/javascript.go +++ b/libbeat/processors/script/javascript/javascript.go @@ -42,9 +42,9 @@ import ( type jsProcessor struct { Config sessionPool *sessionPool - sourceProg *goja.Program sourceFile string stats *processorStats + logger *logp.Logger } // New constructs a new JavaScript processor. @@ -59,50 +59,60 @@ func New(c *config.C, log *logp.Logger) (beat.Processor, error) { // NewFromConfig constructs a new JavaScript processor from the given config // object. It loads the sources, compiles them, and validates the entry point. +// For inline sources, initialization happens immediately. For file-based sources, +// initialization is deferred until SetPaths is called. func NewFromConfig(c Config, reg *monitoring.Registry, logger *logp.Logger) (beat.Processor, error) { err := c.Validate() if err != nil { return nil, err } - var sourceFile string - var sourceCode string - switch { - case c.Source != "": - sourceFile = "inline.js" - sourceCode = c.Source - case c.File != "": - sourceFile, sourceCode, err = loadSources(c.File) - case len(c.Files) > 0: - sourceFile, sourceCode, err = loadSources(c.Files...) + processor := &jsProcessor{ + Config: c, + logger: logger, + stats: getStats(c.Tag, reg, logger), } - if err != nil { - return nil, annotateError(c.Tag, err) + + // For inline sources, we can initialize immediately. + // For file-based sources, we defer initialization until SetPaths is called. + if c.Source != "" { + const inlineSourceFile = "inline.js" + + err = processor.compile(inlineSourceFile, c.Source) + if err != nil { + return nil, err + } } - // Validate processor source code. - prog, err := goja.Compile(sourceFile, sourceCode, true) - if err != nil { - return nil, err + return processor, nil +} + +// SetPaths initializes the processor with the provided paths configuration. +// This method must be called before the processor can be used for file-based sources. +func (p *jsProcessor) SetPaths(path *paths.Path) error { + if p.Source != "" { + return nil // inline source already set } - pool, err := newSessionPool(prog, c, logger) + var sourceFile string + var sourceCode string + var err error + + switch { + case p.File != "": + sourceFile, sourceCode, err = loadSources(path, p.File) + case len(p.Files) > 0: + sourceFile, sourceCode, err = loadSources(path, p.Files...) + } if err != nil { - return nil, annotateError(c.Tag, err) + return annotateError(p.Tag, err) } - return &jsProcessor{ - Config: c, - sessionPool: pool, - sourceProg: prog, - sourceFile: sourceFile, - stats: getStats(c.Tag, reg, logger), - }, nil + return p.compile(sourceFile, sourceCode) } -// loadSources loads javascript source from files. -func loadSources(files ...string) (string, string, error) { - var sources []string +// loadSources loads javascript source from files using the provided paths. +func loadSources(pathConfig *paths.Path, files ...string) (string, string, error) { buf := new(bytes.Buffer) readFile := func(path string) error { @@ -124,8 +134,9 @@ func loadSources(files ...string) (string, string, error) { return nil } + sources := make([]string, 0, len(files)) for _, filePath := range files { - filePath = paths.Resolve(paths.Config, filePath) + filePath = pathConfig.Resolve(paths.Config, filePath) if hasMeta(filePath) { matches, err := filepath.Glob(filePath) @@ -162,9 +173,30 @@ func annotateError(id string, err error) error { return fmt.Errorf("failed in processor.javascript: %w", err) } +func (p *jsProcessor) compile(sourceFile, sourceCode string) error { + // Validate processor source code. + prog, err := goja.Compile(sourceFile, sourceCode, true) + if err != nil { + return err + } + + pool, err := newSessionPool(prog, p.Config, p.logger) + if err != nil { + return annotateError(p.Tag, err) + } + + p.sessionPool = pool + p.sourceFile = sourceFile + return nil +} + // Run executes the processor on the given it event. It invokes the // process function defined in the JavaScript source. func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) { + if p.sessionPool == nil { + return nil, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources") + } + s := p.sessionPool.Get() defer p.sessionPool.Put(s) diff --git a/libbeat/processors/script/javascript/javascript_test.go b/libbeat/processors/script/javascript/javascript_test.go index 908d88816aec..74a0cfab5a78 100644 --- a/libbeat/processors/script/javascript/javascript_test.go +++ b/libbeat/processors/script/javascript/javascript_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/paths" ) func TestNew(t *testing.T) { @@ -69,19 +70,34 @@ func TestNew(t *testing.T) { require.ErrorContains(t, err, "process function not found") }) - t.Run("file not found", func(t *testing.T) { - cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nonexistent.js")}) + t.Run("SetPaths file not found", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]any{"file": "nonexistent.js"}) require.NoError(t, err) - _, err = New(cfg, logptest.NewTestingLogger(t, "")) + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) // Construction succeeds + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok) + + // SetPaths should fail + err = jsProc.SetPaths(tmpPaths(tmpDir)) require.ErrorContains(t, err, "no such file or directory") }) - t.Run("no sources found with glob", func(t *testing.T) { - cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nomatch", "*.js")}) + t.Run("SetPaths no sources found with glob", func(t *testing.T) { + emptyDir := t.TempDir() + cfg, err := config.NewConfigFrom(map[string]any{"file": "nomatch/*.js"}) require.NoError(t, err) - _, err = New(cfg, logptest.NewTestingLogger(t, "")) + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) // Construction succeeds + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok) + + // SetPaths should fail + err = jsProc.SetPaths(tmpPaths(emptyDir)) require.ErrorContains(t, err, "no sources were found") }) } @@ -102,7 +118,7 @@ func TestRun(t *testing.T) { t.Run("with file", func(t *testing.T) { file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`) - p := newTestProcessor(t, "file", file, "") + p := newTestProcessorWithSetPaths(t, tmpDir, "file", filepath.Base(file), "") evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -115,7 +131,7 @@ func TestRun(t *testing.T) { t.Run("with multiple files", func(t *testing.T) { utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;") mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`) - p := newTestProcessor(t, "files", []string{utilFile, mainFile}, "") + p := newTestProcessorWithSetPaths(t, tmpDir, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "") evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -129,7 +145,7 @@ func TestRun(t *testing.T) { globDir := t.TempDir() writeFile(t, globDir, "a_utils.js", "var fromGlob = true;") writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`) - p := newTestProcessor(t, "file", filepath.Join(globDir, "*.js"), "") + p := newTestProcessorWithSetPaths(t, globDir, "file", "*.js", "") evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -139,6 +155,36 @@ func TestRun(t *testing.T) { v, _ := result.GetValue("from_glob") assert.Equal(t, true, v) }) + + t.Run("without SetPaths fails", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]any{"file": "test.js"}) + require.NoError(t, err) + + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + // Try to use without SetPaths - should fail + evt, err := p.Run(newTestEvent()) + assert.Nil(t, evt) + assert.ErrorContains(t, err, "javascript processor not initialized") + assert.ErrorContains(t, err, "SetPaths must be called") + }) + + t.Run("after SetPaths on inline source", func(t *testing.T) { + p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "") + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok) + + err := jsProc.SetPaths(tmpPaths(tmpDir)) + require.NoError(t, err) + + // Should still work + evt, err := jsProc.Run(newTestEvent()) + require.NoError(t, err) + v, _ := evt.GetValue("x") + assert.Equal(t, int64(1), v) + }) } func TestRunWithStats(t *testing.T) { @@ -181,6 +227,35 @@ func TestRunWithStats(t *testing.T) { }) } +func TestSetPathsWithRelativePath(t *testing.T) { + tmpDir := t.TempDir() + scriptsDir := filepath.Join(tmpDir, "scripts") + err := os.MkdirAll(scriptsDir, 0755) + require.NoError(t, err) + + writeFile(t, scriptsDir, "test.js", `function process(event) { event.Put("added", "value"); return event; }`) + + p, err := NewFromConfig(Config{ + File: "scripts/test.js", + }, nil, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok, "processor should be *jsProcessor") + + // Initialize with paths where Config points to configDir + err = jsProc.SetPaths(tmpPaths(tmpDir)) + require.NoError(t, err) + + // Should resolve relative to the config/ directory + evt, err := jsProc.Run(newTestEvent()) + require.NoError(t, err) + + val, err := evt.GetValue("added") + require.NoError(t, err) + assert.Equal(t, "value", val) +} + func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Processor { t.Helper() cfg := map[string]any{key: value} @@ -194,6 +269,20 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc return p } +func newTestProcessorWithSetPaths(t *testing.T, tmpDir, key string, value any, tag string) beat.Processor { + t.Helper() + + p := newTestProcessor(t, key, value, tag) + require.IsType(t, &jsProcessor{}, p) + jsProc, ok := p.(*jsProcessor) + require.True(t, ok, "expected *jsProcessor type") + + err := jsProc.SetPaths(tmpPaths(tmpDir)) + require.NoErrorf(t, err, "failed to set paths for processor") + + return p +} + func writeFile(t *testing.T, dir, name, contents string) string { t.Helper() path := filepath.Join(dir, name) @@ -201,3 +290,20 @@ func writeFile(t *testing.T, dir, name, contents string) string { require.NoErrorf(t, err, "failed to write to file %s", path) return path } + +func newTestEvent() *beat.Event { + return &beat.Event{ + Fields: mapstr.M{ + "message": "test event", + }, + } +} + +func tmpPaths(dir string) *paths.Path { + return &paths.Path{ + Home: dir, + Config: dir, + Data: dir, + Logs: dir, + } +} From f458cf591e2453bb9f7822ca8e02513885577e07 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Tue, 2 Dec 2025 19:23:22 +0100 Subject: [PATCH 2/3] fix cisco ios tests --- .../module/cisco/ios/pipeline_test.go | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/x-pack/filebeat/module/cisco/ios/pipeline_test.go b/x-pack/filebeat/module/cisco/ios/pipeline_test.go index 20c30c5cf92a..b120dbbd2ecd 100644 --- a/x-pack/filebeat/module/cisco/ios/pipeline_test.go +++ b/x-pack/filebeat/module/cisco/ios/pipeline_test.go @@ -7,6 +7,7 @@ package ios_test import ( "encoding/json" "fmt" + "os" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -15,6 +16,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/validator" @@ -189,6 +191,8 @@ func TestFilebeatSyslogCisco(t *testing.T) { t.Fatal(err) } + setCWDPath(t, p) + testInput(t, "syslog", p) testInput(t, "log", p) } @@ -244,6 +248,9 @@ func BenchmarkPipeline(b *testing.B) { if err != nil { b.Fatal(err) } + + setCWDPath(b, p) + b.ResetTimer() for i := 0; i < b.N; i++ { @@ -262,3 +269,19 @@ func BenchmarkPipeline(b *testing.B) { } } } + +func setCWDPath(tb testing.TB, p beat.Processor) { + cwd, err := os.Getwd() + if err != nil { + tb.Fatal(err) + } + + if jsp, ok := p.(interface{ SetPaths(*paths.Path) error }); ok { + if err := jsp.SetPaths(&paths.Path{ + Home: cwd, + Config: cwd, + }); err != nil { + tb.Fatal(err) + } + } +} From 1fa267e28507f69ec0bc776563e9616bc4e45f49 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Wed, 3 Dec 2025 18:59:59 +0100 Subject: [PATCH 3/3] improve tests --- .../script/javascript/javascript.go | 2 +- .../script/javascript/javascript_test.go | 52 +++++++------------ 2 files changed, 21 insertions(+), 33 deletions(-) diff --git a/libbeat/processors/script/javascript/javascript.go b/libbeat/processors/script/javascript/javascript.go index eb077e3763d9..ad0215bd33d9 100644 --- a/libbeat/processors/script/javascript/javascript.go +++ b/libbeat/processors/script/javascript/javascript.go @@ -194,7 +194,7 @@ func (p *jsProcessor) compile(sourceFile, sourceCode string) error { // process function defined in the JavaScript source. func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) { if p.sessionPool == nil { - return nil, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources") + return event, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources") } s := p.sessionPool.Get() diff --git a/libbeat/processors/script/javascript/javascript_test.go b/libbeat/processors/script/javascript/javascript_test.go index 74a0cfab5a78..6d3395c7254f 100644 --- a/libbeat/processors/script/javascript/javascript_test.go +++ b/libbeat/processors/script/javascript/javascript_test.go @@ -118,9 +118,17 @@ func TestRun(t *testing.T) { t.Run("with file", func(t *testing.T) { file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`) - p := newTestProcessorWithSetPaths(t, tmpDir, "file", filepath.Base(file), "") + p := newTestProcessor(t, "file", filepath.Base(file), "") - evt := &beat.Event{Fields: mapstr.M{}} + // Try to use without SetPaths - should fail + evt, err := p.Run(newTestEvent()) + assert.NotNil(t, evt) + assert.ErrorContains(t, err, "javascript processor not initialized") + assert.ErrorContains(t, err, "SetPaths must be called") + + setPaths(t, p, tmpDir) + + evt = &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) require.NoError(t, err) @@ -131,7 +139,9 @@ func TestRun(t *testing.T) { t.Run("with multiple files", func(t *testing.T) { utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;") mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`) - p := newTestProcessorWithSetPaths(t, tmpDir, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "") + + p := newTestProcessor(t, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "") + setPaths(t, p, tmpDir) evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -145,7 +155,9 @@ func TestRun(t *testing.T) { globDir := t.TempDir() writeFile(t, globDir, "a_utils.js", "var fromGlob = true;") writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`) - p := newTestProcessorWithSetPaths(t, globDir, "file", "*.js", "") + + p := newTestProcessor(t, "file", "*.js", "") + setPaths(t, p, globDir) evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -156,31 +168,12 @@ func TestRun(t *testing.T) { assert.Equal(t, true, v) }) - t.Run("without SetPaths fails", func(t *testing.T) { - cfg, err := config.NewConfigFrom(map[string]any{"file": "test.js"}) - require.NoError(t, err) - - p, err := New(cfg, logptest.NewTestingLogger(t, "")) - require.NoError(t, err) - - // Try to use without SetPaths - should fail - evt, err := p.Run(newTestEvent()) - assert.Nil(t, evt) - assert.ErrorContains(t, err, "javascript processor not initialized") - assert.ErrorContains(t, err, "SetPaths must be called") - }) - t.Run("after SetPaths on inline source", func(t *testing.T) { p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "") - - jsProc, ok := p.(*jsProcessor) - require.True(t, ok) - - err := jsProc.SetPaths(tmpPaths(tmpDir)) - require.NoError(t, err) + setPaths(t, p, "/does/not/matter") // Should still work - evt, err := jsProc.Run(newTestEvent()) + evt, err := p.Run(newTestEvent()) require.NoError(t, err) v, _ := evt.GetValue("x") assert.Equal(t, int64(1), v) @@ -269,18 +262,13 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc return p } -func newTestProcessorWithSetPaths(t *testing.T, tmpDir, key string, value any, tag string) beat.Processor { +func setPaths(t *testing.T, p beat.Processor, tmpDir string) { t.Helper() - - p := newTestProcessor(t, key, value, tag) require.IsType(t, &jsProcessor{}, p) jsProc, ok := p.(*jsProcessor) require.True(t, ok, "expected *jsProcessor type") - err := jsProc.SetPaths(tmpPaths(tmpDir)) - require.NoErrorf(t, err, "failed to set paths for processor") - - return p + require.NoError(t, err) } func writeFile(t *testing.T, dir, name, contents string) string {