diff --git a/libbeat/processors/script/javascript/javascript.go b/libbeat/processors/script/javascript/javascript.go index 4519e41d6d64..ad0215bd33d9 100644 --- a/libbeat/processors/script/javascript/javascript.go +++ b/libbeat/processors/script/javascript/javascript.go @@ -42,9 +42,9 @@ import ( type jsProcessor struct { Config sessionPool *sessionPool - sourceProg *goja.Program sourceFile string stats *processorStats + logger *logp.Logger } // New constructs a new JavaScript processor. @@ -59,50 +59,60 @@ func New(c *config.C, log *logp.Logger) (beat.Processor, error) { // NewFromConfig constructs a new JavaScript processor from the given config // object. It loads the sources, compiles them, and validates the entry point. +// For inline sources, initialization happens immediately. For file-based sources, +// initialization is deferred until SetPaths is called. func NewFromConfig(c Config, reg *monitoring.Registry, logger *logp.Logger) (beat.Processor, error) { err := c.Validate() if err != nil { return nil, err } - var sourceFile string - var sourceCode string - switch { - case c.Source != "": - sourceFile = "inline.js" - sourceCode = c.Source - case c.File != "": - sourceFile, sourceCode, err = loadSources(c.File) - case len(c.Files) > 0: - sourceFile, sourceCode, err = loadSources(c.Files...) + processor := &jsProcessor{ + Config: c, + logger: logger, + stats: getStats(c.Tag, reg, logger), } - if err != nil { - return nil, annotateError(c.Tag, err) + + // For inline sources, we can initialize immediately. + // For file-based sources, we defer initialization until SetPaths is called. + if c.Source != "" { + const inlineSourceFile = "inline.js" + + err = processor.compile(inlineSourceFile, c.Source) + if err != nil { + return nil, err + } } - // Validate processor source code. - prog, err := goja.Compile(sourceFile, sourceCode, true) - if err != nil { - return nil, err + return processor, nil +} + +// SetPaths initializes the processor with the provided paths configuration. +// This method must be called before the processor can be used for file-based sources. +func (p *jsProcessor) SetPaths(path *paths.Path) error { + if p.Source != "" { + return nil // inline source already set } - pool, err := newSessionPool(prog, c, logger) + var sourceFile string + var sourceCode string + var err error + + switch { + case p.File != "": + sourceFile, sourceCode, err = loadSources(path, p.File) + case len(p.Files) > 0: + sourceFile, sourceCode, err = loadSources(path, p.Files...) + } if err != nil { - return nil, annotateError(c.Tag, err) + return annotateError(p.Tag, err) } - return &jsProcessor{ - Config: c, - sessionPool: pool, - sourceProg: prog, - sourceFile: sourceFile, - stats: getStats(c.Tag, reg, logger), - }, nil + return p.compile(sourceFile, sourceCode) } -// loadSources loads javascript source from files. -func loadSources(files ...string) (string, string, error) { - var sources []string +// loadSources loads javascript source from files using the provided paths. +func loadSources(pathConfig *paths.Path, files ...string) (string, string, error) { buf := new(bytes.Buffer) readFile := func(path string) error { @@ -124,8 +134,9 @@ func loadSources(files ...string) (string, string, error) { return nil } + sources := make([]string, 0, len(files)) for _, filePath := range files { - filePath = paths.Resolve(paths.Config, filePath) + filePath = pathConfig.Resolve(paths.Config, filePath) if hasMeta(filePath) { matches, err := filepath.Glob(filePath) @@ -162,9 +173,30 @@ func annotateError(id string, err error) error { return fmt.Errorf("failed in processor.javascript: %w", err) } +func (p *jsProcessor) compile(sourceFile, sourceCode string) error { + // Validate processor source code. + prog, err := goja.Compile(sourceFile, sourceCode, true) + if err != nil { + return err + } + + pool, err := newSessionPool(prog, p.Config, p.logger) + if err != nil { + return annotateError(p.Tag, err) + } + + p.sessionPool = pool + p.sourceFile = sourceFile + return nil +} + // Run executes the processor on the given it event. It invokes the // process function defined in the JavaScript source. func (p *jsProcessor) Run(event *beat.Event) (*beat.Event, error) { + if p.sessionPool == nil { + return event, fmt.Errorf("javascript processor not initialized: SetPaths must be called for file-based sources") + } + s := p.sessionPool.Get() defer p.sessionPool.Put(s) diff --git a/libbeat/processors/script/javascript/javascript_test.go b/libbeat/processors/script/javascript/javascript_test.go index 908d88816aec..6d3395c7254f 100644 --- a/libbeat/processors/script/javascript/javascript_test.go +++ b/libbeat/processors/script/javascript/javascript_test.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/paths" ) func TestNew(t *testing.T) { @@ -69,19 +70,34 @@ func TestNew(t *testing.T) { require.ErrorContains(t, err, "process function not found") }) - t.Run("file not found", func(t *testing.T) { - cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nonexistent.js")}) + t.Run("SetPaths file not found", func(t *testing.T) { + cfg, err := config.NewConfigFrom(map[string]any{"file": "nonexistent.js"}) require.NoError(t, err) - _, err = New(cfg, logptest.NewTestingLogger(t, "")) + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) // Construction succeeds + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok) + + // SetPaths should fail + err = jsProc.SetPaths(tmpPaths(tmpDir)) require.ErrorContains(t, err, "no such file or directory") }) - t.Run("no sources found with glob", func(t *testing.T) { - cfg, err := config.NewConfigFrom(map[string]any{"file": filepath.Join(tmpDir, "nomatch", "*.js")}) + t.Run("SetPaths no sources found with glob", func(t *testing.T) { + emptyDir := t.TempDir() + cfg, err := config.NewConfigFrom(map[string]any{"file": "nomatch/*.js"}) require.NoError(t, err) - _, err = New(cfg, logptest.NewTestingLogger(t, "")) + p, err := New(cfg, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) // Construction succeeds + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok) + + // SetPaths should fail + err = jsProc.SetPaths(tmpPaths(emptyDir)) require.ErrorContains(t, err, "no sources were found") }) } @@ -102,9 +118,17 @@ func TestRun(t *testing.T) { t.Run("with file", func(t *testing.T) { file := writeFile(t, tmpDir, "processor.js", `function process(event) { event.Put("from_file", true); }`) - p := newTestProcessor(t, "file", file, "") + p := newTestProcessor(t, "file", filepath.Base(file), "") - evt := &beat.Event{Fields: mapstr.M{}} + // Try to use without SetPaths - should fail + evt, err := p.Run(newTestEvent()) + assert.NotNil(t, evt) + assert.ErrorContains(t, err, "javascript processor not initialized") + assert.ErrorContains(t, err, "SetPaths must be called") + + setPaths(t, p, tmpDir) + + evt = &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) require.NoError(t, err) @@ -115,7 +139,9 @@ func TestRun(t *testing.T) { t.Run("with multiple files", func(t *testing.T) { utilFile := writeFile(t, tmpDir, "util.js", "var multiplier = 2;") mainFile := writeFile(t, tmpDir, "main.js", `function process(event) { event.Put("multiplier", multiplier); }`) - p := newTestProcessor(t, "files", []string{utilFile, mainFile}, "") + + p := newTestProcessor(t, "files", []string{filepath.Base(utilFile), filepath.Base(mainFile)}, "") + setPaths(t, p, tmpDir) evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -129,7 +155,9 @@ func TestRun(t *testing.T) { globDir := t.TempDir() writeFile(t, globDir, "a_utils.js", "var fromGlob = true;") writeFile(t, globDir, "b_main.js", `function process(event) { event.Put("from_glob", fromGlob); }`) - p := newTestProcessor(t, "file", filepath.Join(globDir, "*.js"), "") + + p := newTestProcessor(t, "file", "*.js", "") + setPaths(t, p, globDir) evt := &beat.Event{Fields: mapstr.M{}} result, err := p.Run(evt) @@ -139,6 +167,17 @@ func TestRun(t *testing.T) { v, _ := result.GetValue("from_glob") assert.Equal(t, true, v) }) + + t.Run("after SetPaths on inline source", func(t *testing.T) { + p := newTestProcessor(t, "source", `function process(event) { event.Put("x", 1); return event; }`, "") + setPaths(t, p, "/does/not/matter") + + // Should still work + evt, err := p.Run(newTestEvent()) + require.NoError(t, err) + v, _ := evt.GetValue("x") + assert.Equal(t, int64(1), v) + }) } func TestRunWithStats(t *testing.T) { @@ -181,6 +220,35 @@ func TestRunWithStats(t *testing.T) { }) } +func TestSetPathsWithRelativePath(t *testing.T) { + tmpDir := t.TempDir() + scriptsDir := filepath.Join(tmpDir, "scripts") + err := os.MkdirAll(scriptsDir, 0755) + require.NoError(t, err) + + writeFile(t, scriptsDir, "test.js", `function process(event) { event.Put("added", "value"); return event; }`) + + p, err := NewFromConfig(Config{ + File: "scripts/test.js", + }, nil, logptest.NewTestingLogger(t, "")) + require.NoError(t, err) + + jsProc, ok := p.(*jsProcessor) + require.True(t, ok, "processor should be *jsProcessor") + + // Initialize with paths where Config points to configDir + err = jsProc.SetPaths(tmpPaths(tmpDir)) + require.NoError(t, err) + + // Should resolve relative to the config/ directory + evt, err := jsProc.Run(newTestEvent()) + require.NoError(t, err) + + val, err := evt.GetValue("added") + require.NoError(t, err) + assert.Equal(t, "value", val) +} + func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Processor { t.Helper() cfg := map[string]any{key: value} @@ -194,6 +262,15 @@ func newTestProcessor(t *testing.T, key string, value any, tag string) beat.Proc return p } +func setPaths(t *testing.T, p beat.Processor, tmpDir string) { + t.Helper() + require.IsType(t, &jsProcessor{}, p) + jsProc, ok := p.(*jsProcessor) + require.True(t, ok, "expected *jsProcessor type") + err := jsProc.SetPaths(tmpPaths(tmpDir)) + require.NoError(t, err) +} + func writeFile(t *testing.T, dir, name, contents string) string { t.Helper() path := filepath.Join(dir, name) @@ -201,3 +278,20 @@ func writeFile(t *testing.T, dir, name, contents string) string { require.NoErrorf(t, err, "failed to write to file %s", path) return path } + +func newTestEvent() *beat.Event { + return &beat.Event{ + Fields: mapstr.M{ + "message": "test event", + }, + } +} + +func tmpPaths(dir string) *paths.Path { + return &paths.Path{ + Home: dir, + Config: dir, + Data: dir, + Logs: dir, + } +} diff --git a/x-pack/filebeat/module/cisco/ios/pipeline_test.go b/x-pack/filebeat/module/cisco/ios/pipeline_test.go index 20c30c5cf92a..b120dbbd2ecd 100644 --- a/x-pack/filebeat/module/cisco/ios/pipeline_test.go +++ b/x-pack/filebeat/module/cisco/ios/pipeline_test.go @@ -7,6 +7,7 @@ package ios_test import ( "encoding/json" "fmt" + "os" "testing" "github.com/elastic/beats/v7/libbeat/beat" @@ -15,6 +16,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/validator" @@ -189,6 +191,8 @@ func TestFilebeatSyslogCisco(t *testing.T) { t.Fatal(err) } + setCWDPath(t, p) + testInput(t, "syslog", p) testInput(t, "log", p) } @@ -244,6 +248,9 @@ func BenchmarkPipeline(b *testing.B) { if err != nil { b.Fatal(err) } + + setCWDPath(b, p) + b.ResetTimer() for i := 0; i < b.N; i++ { @@ -262,3 +269,19 @@ func BenchmarkPipeline(b *testing.B) { } } } + +func setCWDPath(tb testing.TB, p beat.Processor) { + cwd, err := os.Getwd() + if err != nil { + tb.Fatal(err) + } + + if jsp, ok := p.(interface{ SetPaths(*paths.Path) error }); ok { + if err := jsp.SetPaths(&paths.Path{ + Home: cwd, + Config: cwd, + }); err != nil { + tb.Fatal(err) + } + } +}