diff --git a/filebeat/autodiscover/builder/hints/logs.go b/filebeat/autodiscover/builder/hints/logs.go index cadef8bd7211..ffa70efa196b 100644 --- a/filebeat/autodiscover/builder/hints/logs.go +++ b/filebeat/autodiscover/builder/hints/logs.go @@ -34,6 +34,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) const ( @@ -68,7 +69,7 @@ func NewLogHints(cfg *conf.C, logger *logp.Logger) (autodiscover.Builder, error) return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err) } - moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{}) + moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{}, paths.Paths) if err != nil { return nil, err } @@ -155,7 +156,6 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf } else { shouldPut(tempCfg, json, jsonOpts, l.log) } - } // Merge config template with the configs from the annotations // AppendValues option is used to append arrays from annotations to existing arrays while merging diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 3e1b834ad124..807c290ccc62 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" ) type crawler struct { @@ -41,6 +42,7 @@ type crawler struct { inputReloader *cfgfile.Reloader once bool beatDone chan struct{} + beatPaths *paths.Path } func newCrawler( @@ -49,6 +51,7 @@ func newCrawler( beatDone chan struct{}, once bool, logger *logp.Logger, + beatPaths *paths.Path, ) (*crawler, error) { return &crawler{ log: logger.Named("crawler"), @@ -58,6 +61,7 @@ func newCrawler( inputConfigs: inputConfigs, once: once, beatDone: beatDone, + beatPaths: beatPaths, }, nil } @@ -80,14 +84,14 @@ func (c *crawler) Start( } if configInputs.Enabled() { - c.inputReloader = cfgfile.NewReloader(log, pipeline, configInputs) + c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs, c.beatPaths) if err := c.inputReloader.Check(c.inputsFactory); err != nil { return fmt.Errorf("creating input reloader failed: %w", err) } } if configModules.Enabled() { - c.modulesReloader = cfgfile.NewReloader(log, pipeline, configModules) + c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules, c.beatPaths) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { return fmt.Errorf("creating module reloader failed: %w", err) } @@ -113,7 +117,6 @@ func (c *crawler) startInput( pipeline beat.PipelineConnector, config *conf.C, ) error { - if !config.Enabled() { c.log.Infof("input disabled, skipping it") return nil diff --git a/filebeat/beater/diagnostics.go b/filebeat/beater/diagnostics.go index 9b8aa59aeb78..b9f2c145059f 100644 --- a/filebeat/beater/diagnostics.go +++ b/filebeat/beater/diagnostics.go @@ -39,12 +39,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) { registryFileRegExps := []*regexp.Regexp{} preFilesList := [][]string{ - []string{"^registry$"}, - []string{"^registry", "filebeat$"}, - []string{"^registry", "filebeat", "meta\\.json$"}, - []string{"^registry", "filebeat", "log\\.json$"}, - []string{"^registry", "filebeat", "active\\.dat$"}, - []string{"^registry", "filebeat", "[[:digit:]]*\\.json$"}, + {"^registry$"}, + {"^registry", "filebeat$"}, + {"^registry", "filebeat", "meta\\.json$"}, + {"^registry", "filebeat", "log\\.json$"}, + {"^registry", "filebeat", "active\\.dat$"}, + {"^registry", "filebeat", "[[:digit:]]*\\.json$"}, } for _, lst := range preFilesList { @@ -70,12 +70,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) { return registryFileRegExps, nil } -func gzipRegistry(logger *logp.Logger) func() []byte { +func gzipRegistry(logger *logp.Logger, beatPaths *paths.Path) func() []byte { logger = logger.Named("diagnostics") return func() []byte { buf := bytes.Buffer{} - dataPath := paths.Resolve(paths.Data, "") + dataPath := beatPaths.Resolve(paths.Data, "") registryPath := filepath.Join(dataPath, "registry") f, err := os.CreateTemp("", "filebeat-registry-*.tar") if err != nil { diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 78b20c28a356..7681cf5fd532 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -109,7 +109,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea EnableAllFilesets: enableAllFilesets, ForceEnableModuleFilesets: forceEnableModuleFilesets, } - moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides) + moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides, b.Paths) if err != nil { return nil, err } @@ -197,7 +197,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { ForceEnableModuleFilesets: forceEnableModuleFilesets, } - modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides) + modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides, b.Paths) if fb.config.ConfigModules.Enabled() { if enableAllFilesets { // All module configs need to be loaded to enable all the filesets @@ -207,7 +207,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { newPath := strings.TrimSuffix(origPath, ".yml") _ = fb.config.ConfigModules.SetString("path", -1, newPath) } - modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules) + modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules, b.Paths) modulesLoader.Load(modulesFactory) } @@ -266,7 +266,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { "Filebeat's registry", "registry.tar.gz", "application/octet-stream", - gzipRegistry(b.Info.Logger)) + gzipRegistry(b.Info.Logger, b.Paths)) } if !fb.moduleRegistry.Empty() { @@ -288,7 +288,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } finishedLogger := newFinishedLogger(wgEvents) - registryMigrator := registrar.NewMigrator(config.Registry, fb.logger) + registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths) if err := registryMigrator.Run(); err != nil { fb.logger.Errorf("Failed to migrate registry file: %+v", err) return err @@ -301,7 +301,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { cn() }() - stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry) + stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry, b.Paths) if err != nil { fb.logger.Errorf("Failed to open state store: %+v", err) return err @@ -361,7 +361,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { pipelineConnector := channel.NewOutletFactory(outDone).Create inputsLogger := fb.logger.Named("input") - v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, paths.Paths) + v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, b.Paths) v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType) if err != nil { panic(err) // loader detected invalid state. @@ -403,8 +403,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error { fb.logger.Warn(pipelinesWarning) } } - moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines) - crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger) + moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines, b.Paths) + crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger, b.Paths) if err != nil { fb.logger.Errorf("Could not init crawler: %v", err) return err diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go index 08c1354789ef..6f0b370b2de4 100644 --- a/filebeat/beater/store.go +++ b/filebeat/beater/store.go @@ -46,7 +46,7 @@ type filebeatStore struct { notifier *es.Notifier } -func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) { +func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry, beatPaths *paths.Path) (*filebeatStore, error) { var ( reg backend.Registry err error @@ -61,7 +61,7 @@ func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cf } reg, err = memlog.New(logger, memlog.Settings{ - Root: paths.Resolve(paths.Data, cfg.Path), + Root: beatPaths.Resolve(paths.Data, cfg.Path), FileMode: cfg.Permissions, }) if err != nil { diff --git a/filebeat/fileset/config.go b/filebeat/fileset/config.go index b6ef5322c503..780b1a37195e 100644 --- a/filebeat/fileset/config.go +++ b/filebeat/fileset/config.go @@ -60,10 +60,10 @@ func NewFilesetConfig(cfg *conf.C) (*FilesetConfig, error) { // mergePathDefaults returns a copy of c containing the path variables that must // be available for variable expansion in module configuration (e.g. it enables // the use of ${path.config} in module config). -func mergePathDefaults(c *conf.C) (*conf.C, error) { +func mergePathDefaults(c *conf.C, beatPaths *paths.Path) (*conf.C, error) { defaults := conf.MustNewConfigFrom(map[string]interface{}{ "path": map[string]interface{}{ - "home": paths.Paths.Home, + "home": beatPaths.Home, "config": "${path.home}", "data": filepath.Join("${path.home}", "data"), "logs": filepath.Join("${path.home}", "logs"), diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 5588cdcc8e49..902cb8c49a46 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -31,10 +31,13 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/paths" ) -var moduleList = monitoring.NewUniqueList() -var moduleListMetricsOnce sync.Once +var ( + moduleList = monitoring.NewUniqueList() + moduleListMetricsOnce sync.Once +) // RegisterMonitoringModules registers the modules list with the monitoring system. func RegisterMonitoringModules(namespace string) { @@ -50,6 +53,7 @@ type Factory struct { overwritePipelines bool pipelineCallbackID uuid.UUID inputFactory cfgfile.RunnerFactory + beatPaths *paths.Path } // Wrap an array of inputs and implements cfgfile.Runner interface @@ -69,6 +73,7 @@ func NewFactory( beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, + beatPaths *paths.Path, ) *Factory { return &Factory{ inputFactory: inputFactory, @@ -76,6 +81,7 @@ func NewFactory( pipelineLoaderFactory: pipelineLoaderFactory, pipelineCallbackID: uuid.Nil, overwritePipelines: overwritePipelines, + beatPaths: beatPaths, } } @@ -134,7 +140,7 @@ func (f *Factory) CheckConfig(c *conf.C) error { // createRegistry starts a registry for a set of filesets, it returns the registry and // its input configurations func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) { - m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{}) + m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{}, f.beatPaths) if err != nil { return nil, nil, err } diff --git a/filebeat/fileset/fileset.go b/filebeat/fileset/fileset.go index c2ab19bbd8e5..a0457c15aa4d 100644 --- a/filebeat/fileset/fileset.go +++ b/filebeat/fileset/fileset.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/cfgwarn" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/version" ) @@ -53,6 +54,7 @@ type Fileset struct { vars map[string]interface{} pipelineIDs []string logger *logp.Logger + beatPaths *paths.Path } type pipeline struct { @@ -67,6 +69,7 @@ func New( mname string, fcfg *FilesetConfig, logger *logp.Logger, + beatPaths *paths.Path, ) (*Fileset, error, ) { modulePath := filepath.Join(modulesPath, mname) @@ -80,6 +83,7 @@ func New( fcfg: fcfg, modulePath: modulePath, logger: logger, + beatPaths: beatPaths, }, nil } @@ -168,7 +172,7 @@ func (fs *Fileset) evaluateVars(info beat.Info) (map[string]interface{}, error) var exists bool name, exists := vals["name"].(string) if !exists { - return nil, fmt.Errorf("Variable doesn't have a string 'name' key") + return nil, fmt.Errorf("variable doesn't have a string 'name' key") } // Variables are not required to have a default. Templates should @@ -207,28 +211,31 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi } if !esVersion.IsValid() { - return vars, errors.New("Unknown Elasticsearch version") + return vars, errors.New("unknown Elasticsearch version") } for _, vals := range fs.manifest.Vars { var ok bool name, ok := vals["name"].(string) if !ok { - return nil, fmt.Errorf("Variable doesn't have a string 'name' key") + return nil, fmt.Errorf("variable doesn't have a string 'name' key") } minESVersion, ok := vals["min_elasticsearch_version"].(map[string]interface{}) if ok { - minVersion, err := version.New(minESVersion["version"].(string)) - if err != nil { - return vars, fmt.Errorf("Error parsing version %s: %w", minESVersion["version"].(string), err) - } + versionString, ok := minESVersion["version"].(string) + if ok { + minVersion, err := version.New(versionString) + if err != nil { + return vars, fmt.Errorf("Error parsing version %s: %w", versionString, err) + } - fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion) + fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion) - if esVersion.LessThan(minVersion) { - retVars[name] = minESVersion["value"] - fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String()) + if esVersion.LessThan(minVersion) { + retVars[name] = minESVersion["value"] + fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String()) + } } } } @@ -312,11 +319,11 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error) }, "IngestPipeline": func(shortID string) string { return FormatPipelineID( - builtinVars["prefix"].(string), - builtinVars["module"].(string), - builtinVars["fileset"].(string), + builtinVars["prefix"].(string), //nolint:errcheck //keep behavior for now + builtinVars["module"].(string), //nolint:errcheck //keep behavior for now + builtinVars["fileset"].(string), //nolint:errcheck //keep behavior for now shortID, - builtinVars["beatVersion"].(string), + builtinVars["beatVersion"].(string), //nolint:errcheck //keep behavior for now ) }, }, nil @@ -368,7 +375,7 @@ func (fs *Fileset) getInputConfig() (*conf.C, error) { return nil, fmt.Errorf("Error reading input config: %w", err) } - cfg, err = mergePathDefaults(cfg) + cfg, err = mergePathDefaults(cfg, fs.beatPaths) if err != nil { return nil, err } @@ -461,7 +468,7 @@ func (fs *Fileset) GetPipelines(esVersion version.V) (pipelines []pipeline, err } newContent, err := FixYAMLMaps(content) if err != nil { - return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %w", path, err) + return nil, fmt.Errorf("failed to sanitize the YAML pipeline file: %s: %w", path, err) } var ok bool content, ok = newContent.(map[string]interface{}) @@ -469,7 +476,7 @@ func (fs *Fileset) GetPipelines(esVersion version.V) (pipelines []pipeline, err return nil, errors.New("cannot convert newContent to map[string]interface{}") } default: - return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path) + return nil, fmt.Errorf("unsupported extension '%s' for pipeline file: %s", extension, path) } pipelineID := fs.pipelineIDs[idx] diff --git a/filebeat/fileset/fileset_test.go b/filebeat/fileset/fileset_test.go index 289a134ecdf5..ebe9ee52667d 100644 --- a/filebeat/fileset/fileset_test.go +++ b/filebeat/fileset/fileset_test.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/version" ) @@ -43,9 +44,14 @@ func makeTestInfo(version string) beat.Info { } func getModuleForTesting(t *testing.T, module, fileset string) *Fileset { + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err := beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + modulesPath, err := filepath.Abs("../module") require.NoError(t, err) - fs, err := New(modulesPath, fileset, module, &FilesetConfig{}, logptest.NewTestingLogger(t, "")) + fs, err := New(modulesPath, fileset, module, &FilesetConfig{}, logptest.NewTestingLogger(t, ""), beatPaths) require.NoError(t, err) return fs @@ -97,13 +103,18 @@ func TestEvaluateVarsNginx(t *testing.T) { } func TestEvaluateVarsNginxOverride(t *testing.T) { + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err := beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + modulesPath, err := filepath.Abs("../module") require.NoError(t, err) fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{ Var: map[string]interface{}{ "pipeline": "no_plugins", }, - }, logptest.NewTestingLogger(t, "")) + }, logptest.NewTestingLogger(t, ""), beatPaths) require.NoError(t, err) fs.manifest, err = fs.readManifest() @@ -238,11 +249,16 @@ func TestGetInputConfigNginxOverrides(t *testing.T) { }, } + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err = beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + for name, test := range tests { t.Run(name, func(t *testing.T) { fs, err := New(modulesPath, "access", "nginx", &FilesetConfig{ Input: test.input, - }, logptest.NewTestingLogger(t, "")) + }, logptest.NewTestingLogger(t, ""), beatPaths) require.NoError(t, err) require.NoError(t, fs.Read(makeTestInfo("5.2.0"))) diff --git a/filebeat/fileset/modules.go b/filebeat/fileset/modules.go index ebb3c1d86f34..f5b2800e00c6 100644 --- a/filebeat/fileset/modules.go +++ b/filebeat/fileset/modules.go @@ -37,8 +37,9 @@ import ( const logName = "modules" type ModuleRegistry struct { - registry []Module // []Module -> []Fileset - log *logp.Logger + registry []Module // []Module -> []Fileset + log *logp.Logger + beatPaths *paths.Path } type Module struct { @@ -57,10 +58,12 @@ func newModuleRegistry(modulesPath string, overrides *ModuleOverrides, beatInfo beat.Info, filesetOverrides FilesetOverrides, + beatPaths *paths.Path, ) (*ModuleRegistry, error) { reg := ModuleRegistry{ - registry: []Module{}, - log: beatInfo.Logger.Named(logName), + registry: []Module{}, + log: beatInfo.Logger.Named(logName), + beatPaths: beatPaths, } for _, mcfg := range moduleConfigs { @@ -120,7 +123,7 @@ func newModuleRegistry(modulesPath string, return nil, fmt.Errorf("fileset %s/%s is configured but doesn't exist", mcfg.Module, filesetName) } - fileset, err := New(modulesPath, filesetName, mcfg.Module, fcfg, beatInfo.Logger) + fileset, err := New(modulesPath, filesetName, mcfg.Module, fcfg, beatInfo.Logger, beatPaths) if err != nil { return nil, err } @@ -143,8 +146,8 @@ func newModuleRegistry(modulesPath string, } // NewModuleRegistry reads and loads the configured module into the registry. -func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, filesetOverrides FilesetOverrides) (*ModuleRegistry, error) { - modulesPath := paths.Resolve(paths.Home, "module") +func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, filesetOverrides FilesetOverrides, beatPaths *paths.Path) (*ModuleRegistry, error) { + modulesPath := beatPaths.Resolve(paths.Home, "module") stat, err := os.Stat(modulesPath) if err != nil || !stat.IsDir() { @@ -153,7 +156,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, f // When run under agent via agentbeat there is no modules directory and this is expected. log.Errorf("Not loading modules. Module directory not found: %s", modulesPath) } - return &ModuleRegistry{log: log}, nil + return &ModuleRegistry{log: log, beatPaths: beatPaths}, nil } var modulesCLIList []string @@ -166,7 +169,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, f } var mcfgs []*ModuleConfig //nolint:prealloc //breaks tests for _, cfg := range moduleConfigs { - cfg, err = mergePathDefaults(cfg) + cfg, err = mergePathDefaults(cfg, beatPaths) if err != nil { return nil, err } @@ -184,7 +187,7 @@ func NewModuleRegistry(moduleConfigs []*conf.C, beatInfo beat.Info, init bool, f } enableFilesetsFromOverrides(mcfgs, modulesOverrides) - return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo, filesetOverrides) + return newModuleRegistry(modulesPath, mcfgs, modulesOverrides, beatInfo, filesetOverrides, beatPaths) } // enableFilesetsFromOverrides enables in mcfgs the filesets mentioned in overrides, @@ -461,7 +464,7 @@ func (reg *ModuleRegistry) ModuleNames() []string { // ModuleAvailableFilesets return the list of available filesets for the given module // it returns an empty list if the module doesn't exist func (reg *ModuleRegistry) ModuleAvailableFilesets(module string) ([]string, error) { - modulesPath := paths.Resolve(paths.Home, "module") + modulesPath := reg.beatPaths.Resolve(paths.Home, "module") return getModuleFilesets(modulesPath, module) } diff --git a/filebeat/fileset/modules_integration_test.go b/filebeat/fileset/modules_integration_test.go index 0470bef9e21a..1c73847f331f 100644 --- a/filebeat/fileset/modules_integration_test.go +++ b/filebeat/fileset/modules_integration_test.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/esleg/eslegtest" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/logp/logptest" + "github.com/elastic/elastic-agent-libs/paths" ) func makeTestInfo(version string) beat.Info { @@ -118,7 +119,12 @@ func TestSetupNginx(t *testing.T) { }, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0"), FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + if err := beatPaths.InitPaths(beatPaths); err != nil { + t.Fatal(err) + } + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("5.2.0"), FilesetOverrides{}, beatPaths) if err != nil { t.Fatal(err) } @@ -197,7 +203,12 @@ func TestLoadMultiplePipelines(t *testing.T) { {"foo", &enabled, filesetConfigs}, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + if err := beatPaths.InitPaths(beatPaths); err != nil { + t.Fatal(err) + } + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), FilesetOverrides{}, beatPaths) if err != nil { t.Fatal(err) } @@ -242,7 +253,13 @@ func TestLoadMultiplePipelinesWithRollback(t *testing.T) { {"foo", &enabled, filesetConfigs}, } - reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + if err := beatPaths.InitPaths(beatPaths); err != nil { + t.Fatal(err) + } + + reg, err := newModuleRegistry(modulesPath, configs, nil, makeTestInfo("6.6.0"), FilesetOverrides{}, beatPaths) if err != nil { t.Fatal(err) } diff --git a/filebeat/fileset/modules_test.go b/filebeat/fileset/modules_test.go index d90da2aa814c..9ad987da6c84 100644 --- a/filebeat/fileset/modules_test.go +++ b/filebeat/fileset/modules_test.go @@ -83,7 +83,11 @@ func TestNewModuleRegistry(t *testing.T) { } logger := logptest.NewTestingLogger(t, "") - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err = beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}, beatPaths) require.NoError(t, err) assert.NotNil(t, reg) @@ -151,7 +155,11 @@ func TestNewModuleRegistryConfig(t *testing.T) { } logger := logptest.NewTestingLogger(t, "") - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err = beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}, beatPaths) require.NoError(t, err) assert.NotNil(t, reg) @@ -178,7 +186,11 @@ func TestMovedModule(t *testing.T) { } logger := logptest.NewTestingLogger(t, "") - reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err = beatPaths.InitPaths(beatPaths) + require.NoError(t, err) + reg, err := newModuleRegistry(modulesPath, configs, nil, beat.Info{Version: "5.2.0", Logger: logger}, FilesetOverrides{}, beatPaths) require.NoError(t, err) assert.NotNil(t, reg) } @@ -441,16 +453,15 @@ func TestMcfgFromConfig(t *testing.T) { } func TestMissingModuleFolder(t *testing.T) { - home := paths.Paths.Home - paths.Paths.Home = "/no/such/path" - defer func() { paths.Paths.Home = home }() + p := paths.New() + p.Home = "/no/such/path" configs := []*conf.C{ load(t, map[string]interface{}{"module": "nginx"}), } logger := logptest.NewTestingLogger(t, "") - reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0", Logger: logger}, true, FilesetOverrides{}) + reg, err := NewModuleRegistry(configs, beat.Info{Version: "5.2.0", Logger: logger}, true, FilesetOverrides{}, p) require.NoError(t, err) assert.NotNil(t, reg) diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index a17a504ab7f3..962620fb6a05 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" pubpipeline "github.com/elastic/beats/v7/libbeat/publisher/pipeline" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/paths" ) // SetupFactory is for loading module assets when running setup subcommand. @@ -30,21 +31,23 @@ type SetupFactory struct { pipelineLoaderFactory PipelineLoaderFactory overwritePipelines bool filesetOverrides FilesetOverrides + beatPaths *paths.Path } // NewSetupFactory creates a SetupFactory -func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFactory, filesetOverrides FilesetOverrides) *SetupFactory { +func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFactory, filesetOverrides FilesetOverrides, beatPaths *paths.Path) *SetupFactory { return &SetupFactory{ beatInfo: beatInfo, pipelineLoaderFactory: pipelineLoaderFactory, overwritePipelines: true, filesetOverrides: filesetOverrides, + beatPaths: beatPaths, } } // Create creates a new SetupCfgRunner to setup module configuration. func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *conf.C) (cfgfile.Runner, error) { - m, err := NewModuleRegistry([]*conf.C{c}, sf.beatInfo, false, sf.filesetOverrides) + m, err := NewModuleRegistry([]*conf.C{c}, sf.beatInfo, false, sf.filesetOverrides, sf.beatPaths) if err != nil { return nil, err } diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go index 1be807f32995..4b3756b70224 100644 --- a/filebeat/registrar/migrate.go +++ b/filebeat/registrar/migrate.go @@ -49,11 +49,11 @@ type Migrator struct { logger *logp.Logger } -func NewMigrator(cfg config.Registry, logger *logp.Logger) *Migrator { - path := paths.Resolve(paths.Data, cfg.Path) +func NewMigrator(cfg config.Registry, logger *logp.Logger, beatPaths *paths.Path) *Migrator { + path := beatPaths.Resolve(paths.Data, cfg.Path) migrateFile := cfg.MigrateFile if migrateFile != "" { - migrateFile = paths.Resolve(paths.Data, migrateFile) + migrateFile = beatPaths.Resolve(paths.Data, migrateFile) } return &Migrator{ diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index f1df1b3c4b83..d9560846abfc 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -22,7 +22,6 @@ import ( "errors" "fmt" "sync" - "syscall" "time" @@ -184,7 +183,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } if bt.config.ConfigMonitors.Enabled() { - bt.monitorReloader = cfgfile.NewReloader(b.Info.Logger.Named("module.reload"), b.Publisher, bt.config.ConfigMonitors) + bt.monitorReloader = cfgfile.NewReloader(b.Info.Logger.Named("module.reload"), b.Publisher, bt.config.ConfigMonitors, b.Paths) defer bt.monitorReloader.Stop() err := bt.RunReloadableMonitors() diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index c2dd9eddd4ce..620a855134d1 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/keystore" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/useragent" ) @@ -88,6 +89,7 @@ type Beat struct { API *api.Server // API server. This is nil unless the http endpoint is enabled. Registry *reload.Registry // input, & output registry for configuration manager, should be instantiated in NewBeat + Paths *paths.Path // per beat paths definition } // GenerateUserAgent populates the UserAgent field on the beat.Info struct diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 77e338110f6e..dbef4abc5ba7 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -103,13 +103,13 @@ type Reloader struct { } // NewReloader creates new Reloader instance for the given config -func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader { +func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C, beatPaths *paths.Path) *Reloader { conf := DefaultDynamicConfig _ = cfg.Unpack(&conf) path := conf.Path if !filepath.IsAbs(path) { - path = paths.Resolve(paths.Config, path) + path = beatPaths.Resolve(paths.Config, path) } return &Reloader{ diff --git a/libbeat/cfgfile/reload_test.go b/libbeat/cfgfile/reload_test.go index 177cee8c610b..4b45d28433ed 100644 --- a/libbeat/cfgfile/reload_test.go +++ b/libbeat/cfgfile/reload_test.go @@ -23,6 +23,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "testing" "time" @@ -31,6 +32,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) func TestReloader(t *testing.T) { @@ -50,7 +52,8 @@ func TestReloader(t *testing.T) { }, }) // config.C{} - reloader := NewReloader(logptest.NewTestingLogger(t, "cfgfile-test.reload"), nil, config) + + reloader := NewReloader(logptest.NewTestingLogger(t, "cfgfile-test.reload"), nil, config, paths.Paths) retryCount := 10 go reloader.Run(nil) @@ -75,7 +78,7 @@ func TestReloader(t *testing.T) { // Write a file to the reloader path to trigger a real reload content := []byte("test\n") - err = ioutil.WriteFile(dir+"/config1.yml", content, 0644) + err = os.WriteFile(filepath.Join(dir, "config1.yml"), content, 0o644) assert.NoError(t, err) // Wait for the number of scans to increase at least twice. This is somewhat diff --git a/libbeat/cmd/export/ilm_policy.go b/libbeat/cmd/export/ilm_policy.go index 557c62c8aaef..e0b7e6f35077 100644 --- a/libbeat/cmd/export/ilm_policy.go +++ b/libbeat/cmd/export/ilm_policy.go @@ -46,7 +46,7 @@ func GenGetILMPolicyCmd(settings instance.Settings) *cobra.Command { // the way this works, we decide to export ILM or DSL based on the user's config. // This means that if a user has no index management config, we'll default to ILM, regardless of what the user // is connected to. Might not be a problem since a user who doesn't have any custom lifecycle config has nothing to export? - clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig) + clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Paths, b.Config.LifecycleConfig) if err != nil { fatalf("error creating file handler: %s", err) } diff --git a/libbeat/cmd/export/template.go b/libbeat/cmd/export/template.go index 45a83b986817..de66ddb1ebe4 100644 --- a/libbeat/cmd/export/template.go +++ b/libbeat/cmd/export/template.go @@ -48,7 +48,7 @@ func GenTemplateConfigCmd(settings instance.Settings) *cobra.Command { fatalfInitCmd(err) } - clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Config.LifecycleConfig) + clientHandler, err := idxmgmt.NewFileClientHandler(newIdxmgmtClient(dir, version), b.Info, b.Paths, b.Config.LifecycleConfig) if err != nil { fatalf("error creating file handler: %s", err) } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 28311ac082b4..22dc57ffebc8 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -248,11 +248,12 @@ func NewBeat(name, indexPrefix, v string, elasticLicensed bool, initFuncs []func ID: id, FirstStart: time.Now(), StartTime: time.Now(), - EphemeralID: metricreport.EphemeralID(), + EphemeralID: metricreport.EphemeralID(), //nolint:staticcheck //keep behavior for now FIPSDistribution: version.FIPSDistribution, }, Fields: fields, Registry: reload.NewRegistry(), + Paths: paths.New(), } return &Beat{Beat: b}, nil @@ -426,7 +427,7 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { // Try to acquire exclusive lock on data path to prevent another beat instance // sharing same data path. This is disabled under elastic-agent. if !management.UnderAgent() { - bl := locks.New(b.Info) + bl := locks.New(b.Info, b.Paths) err := bl.Lock() if err != nil { return err @@ -690,7 +691,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er loadILM = idxmgmt.LoadModeEnabled } - mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) + mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Paths, b.Config.LifecycleConfig) if err != nil { return fmt.Errorf("error creating index management handler: %w", err) } @@ -766,10 +767,11 @@ func (b *Beat) configure(settings Settings) error { if err := InitPaths(cfg); err != nil { return err } + b.Paths = paths.Paths // We have to initialize the keystore before any unpack or merging the cloud // options. - store, err := LoadKeystore(cfg, b.Info.Beat) + store, err := LoadKeystore(cfg, b.Info.Beat, b.Paths) if err != nil { return fmt.Errorf("could not initialize the keystore: %w", err) } @@ -829,9 +831,9 @@ func (b *Beat) configure(settings Settings) error { b.Instrumentation = instrumentation // log paths values to help with troubleshooting - logger.Infof("%s", paths.Paths.String()) + logger.Infof("%s", b.Paths.String()) - metaPath := paths.Resolve(paths.Data, "meta.json") + metaPath := b.Paths.Resolve(paths.Data, "meta.json") err = b.LoadMeta(metaPath) if err != nil { return err @@ -1068,7 +1070,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { return fmt.Errorf("error generating index pattern: %w", err) } - err = dashboards.ImportDashboards(ctx, b.Info, paths.Resolve(paths.Home, ""), + err = dashboards.ImportDashboards(ctx, b.Info, b.Paths.Resolve(paths.Home, ""), kibanaConfig, b.Config.Dashboards, nil, pattern) if err != nil { return fmt.Errorf("error importing Kibana dashboards: %w", err) @@ -1134,7 +1136,7 @@ func (b *Beat) registerESIndexManagement() error { func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { return func(esClient *eslegclient.Connection, _ *logp.Logger) error { - mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Config.LifecycleConfig) + mgmtHandler, err := idxmgmt.NewESClientHandler(esClient, b.Info, b.Paths, b.Config.LifecycleConfig) if err != nil { return fmt.Errorf("error creating index management handler: %w", err) } @@ -1381,10 +1383,10 @@ func (b *Beat) logSystemInfo(log *logp.Logger) { "type": b.Info.Beat, "uuid": b.Info.ID, "path": mapstr.M{ - "config": paths.Resolve(paths.Config, ""), - "data": paths.Resolve(paths.Data, ""), - "home": paths.Resolve(paths.Home, ""), - "logs": paths.Resolve(paths.Logs, ""), + "config": b.Paths.Resolve(paths.Config, ""), + "data": b.Paths.Resolve(paths.Data, ""), + "home": b.Paths.Resolve(paths.Home, ""), + "logs": b.Paths.Resolve(paths.Logs, ""), }, } log.Infow("Beat info", "beat", beat) diff --git a/libbeat/cmd/instance/keystore_fips.go b/libbeat/cmd/instance/keystore_fips.go index 3f5fe0fde633..ffcfe191a91e 100644 --- a/libbeat/cmd/instance/keystore_fips.go +++ b/libbeat/cmd/instance/keystore_fips.go @@ -22,9 +22,10 @@ package instance import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/keystore" + "github.com/elastic/elastic-agent-libs/paths" ) // LoadKeystore returns nil in FIPS mode -func LoadKeystore(cfg *config.C, name string) (keystore.Keystore, error) { +func LoadKeystore(cfg *config.C, name string, beatPaths *paths.Path) (keystore.Keystore, error) { return nil, nil } diff --git a/libbeat/cmd/instance/keystore_fips_test.go b/libbeat/cmd/instance/keystore_fips_test.go index 5231714783d8..30c590e713fb 100644 --- a/libbeat/cmd/instance/keystore_fips_test.go +++ b/libbeat/cmd/instance/keystore_fips_test.go @@ -23,10 +23,11 @@ import ( "testing" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/paths" ) func TestLoadKeystore(t *testing.T) { - ks, err := LoadKeystore(config.NewConfig(), "test") + ks, err := LoadKeystore(config.NewConfig(), "test", paths.New()) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/libbeat/cmd/instance/keystore_nofips.go b/libbeat/cmd/instance/keystore_nofips.go index 44b7e6813c35..425f6fd93571 100644 --- a/libbeat/cmd/instance/keystore_nofips.go +++ b/libbeat/cmd/instance/keystore_nofips.go @@ -29,8 +29,8 @@ import ( ) // LoadKeystore returns the appropriate keystore based on the configuration. -func LoadKeystore(cfg *config.C, name string) (keystore.Keystore, error) { +func LoadKeystore(cfg *config.C, name string, beatPaths *paths.Path) (keystore.Keystore, error) { keystoreCfg, _ := cfg.Child("keystore", -1) - defaultPathConfig := paths.Resolve(paths.Data, fmt.Sprintf("%s.keystore", name)) + defaultPathConfig := beatPaths.Resolve(paths.Data, fmt.Sprintf("%s.keystore", name)) return keystore.Factory(keystoreCfg, defaultPathConfig, common.IsStrictPerms()) } diff --git a/libbeat/cmd/instance/locks/lock.go b/libbeat/cmd/instance/locks/lock.go index 529d0cece0f6..9bbb60c50684 100644 --- a/libbeat/cmd/instance/locks/lock.go +++ b/libbeat/cmd/instance/locks/lock.go @@ -36,21 +36,19 @@ type Locker struct { logger *logp.Logger } -var ( - // ErrAlreadyLocked is returned when a lock on the data path is attempted but - // unsuccessful because another Beat instance already has the lock on the same - // data path. - ErrAlreadyLocked = fmt.Errorf("data path already locked by another beat. Please make sure that multiple beats are not sharing the same data path (path.data)") -) +// ErrAlreadyLocked is returned when a lock on the data path is attempted but +// unsuccessful because another Beat instance already has the lock on the same +// data path. +var ErrAlreadyLocked = fmt.Errorf("data path already locked by another beat. Please make sure that multiple beats are not sharing the same data path (path.data)") // New returns a new file locker -func New(beatInfo beat.Info) *Locker { - return NewWithRetry(beatInfo, 4, time.Millisecond*400) +func New(beatInfo beat.Info, beatPaths *paths.Path) *Locker { + return NewWithRetry(beatInfo, beatPaths, 4, time.Millisecond*400) } // NewWithRetry returns a new file locker with the given settings -func NewWithRetry(beatInfo beat.Info, retryCount int, retrySleep time.Duration) *Locker { - lockfilePath := paths.Resolve(paths.Data, beatInfo.Beat+".lock") +func NewWithRetry(beatInfo beat.Info, beatPaths *paths.Path, retryCount int, retrySleep time.Duration) *Locker { + lockfilePath := beatPaths.Resolve(paths.Data, beatInfo.Beat+".lock") return &Locker{ fileLock: flock.New(lockfilePath), retryCount: retryCount, diff --git a/libbeat/cmd/instance/locks/lock_test.go b/libbeat/cmd/instance/locks/lock_test.go index f2d5d77d2a8d..f06507688253 100644 --- a/libbeat/cmd/instance/locks/lock_test.go +++ b/libbeat/cmd/instance/locks/lock_test.go @@ -30,7 +30,6 @@ import ( ) func TestMain(m *testing.M) { - tmp, err := os.MkdirTemp("", "pidfile_test") defer os.RemoveAll(tmp) if err != nil { @@ -57,6 +56,12 @@ func TestMain(m *testing.M) { func TestLocker(t *testing.T) { // Setup two beats with same name and data path const beatName = "testbeat-testlocker" + tmpDir := t.TempDir() + tmpPaths := paths.New() + tmpPaths.Home = tmpDir + err := tmpPaths.InitPaths(tmpPaths) + require.NoError(t, err, "error initializaing paths") + fmt.Fprintf(os.Stderr, "paths was %s", tmpPaths) logger := logptest.NewTestingLogger(t, "") @@ -67,32 +72,36 @@ func TestLocker(t *testing.T) { b2.Beat = beatName // Try to get a lock for the first beat. Expect it to succeed. - bl1 := New(b1) - err := bl1.Lock() + bl1 := New(b1, tmpPaths) + err = bl1.Lock() require.NoError(t, err) // Try to get a lock for the second beat. Expect it to fail because the // first beat already has the lock. - bl2 := New(b2) + bl2 := New(b2, tmpPaths) err = bl2.Lock() require.Error(t, err) - } func TestUnlock(t *testing.T) { const beatName = "testbeat-testunlock" logger := logptest.NewTestingLogger(t, "") + tmpDir := t.TempDir() + tmpPaths := paths.New() + tmpPaths.Home = tmpDir + err := tmpPaths.InitPaths(tmpPaths) + require.NoError(t, err, "error initializaing paths") b1 := beat.Info{Logger: logger} b1.Beat = beatName b2 := beat.Info{} b2.Beat = beatName - bl2 := New(b2) + bl2 := New(b2, tmpPaths) // Try to get a lock for the first beat. Expect it to succeed. - bl1 := New(b1) - err := bl1.Lock() + bl1 := New(b1, tmpPaths) + err = bl1.Lock() require.NoError(t, err) // now unlock @@ -102,23 +111,27 @@ func TestUnlock(t *testing.T) { // try with other lockfile err = bl2.Lock() require.NoError(t, err) - } func TestUnlockWithRemainingFile(t *testing.T) { const beatName = "testbeat-testunlockwithfile" logger := logptest.NewTestingLogger(t, "") + tmpDir := t.TempDir() + tmpPaths := paths.New() + tmpPaths.Home = tmpDir + err := tmpPaths.InitPaths(tmpPaths) + require.NoError(t, err, "error initializaing paths") b1 := beat.Info{Logger: logger} b1.Beat = beatName b2 := beat.Info{} b2.Beat = beatName - bl2 := New(b2) + bl2 := New(b2, tmpPaths) // Try to get a lock for the first beat. Expect it to succeed. - bl1 := New(b1) - err := bl1.Lock() + bl1 := New(b1, tmpPaths) + err = bl1.Lock() require.NoError(t, err) // unlock the underlying FD, so we don't remove the file diff --git a/libbeat/idxmgmt/client_handler.go b/libbeat/idxmgmt/client_handler.go index b97bd1cc860e..bf1024d6d1d2 100644 --- a/libbeat/idxmgmt/client_handler.go +++ b/libbeat/idxmgmt/client_handler.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/idxmgmt/lifecycle" "github.com/elastic/beats/v7/libbeat/template" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/version" ) @@ -59,12 +60,12 @@ func NewClientHandler(ilm lifecycle.ClientHandler, template template.Loader) Cli // NewESClientHandler returns a new ESLoader instance, // initialized with an ilm and template client handler based on the passed in client. -func NewESClientHandler(client ESClient, info beat.Info, cfg lifecycle.RawConfig) (ClientHandler, error) { +func NewESClientHandler(client ESClient, info beat.Info, beatPaths *paths.Path, cfg lifecycle.RawConfig) (ClientHandler, error) { esHandler, err := lifecycle.NewESClientHandler(client, info, cfg) if err != nil { return nil, fmt.Errorf("error creating ES handler: %w", err) } - loader, err := template.NewESLoader(client, esHandler, info.Logger) + loader, err := template.NewESLoader(client, esHandler, info.Logger, beatPaths) if err != nil { return nil, fmt.Errorf("error creating ES loader: %w", err) } @@ -73,10 +74,10 @@ func NewESClientHandler(client ESClient, info beat.Info, cfg lifecycle.RawConfig // NewFileClientHandler returns a new ESLoader instance, // initialized with an ilm and template client handler based on the passed in client. -func NewFileClientHandler(client FileClient, info beat.Info, cfg lifecycle.RawConfig) (ClientHandler, error) { +func NewFileClientHandler(client FileClient, info beat.Info, beatPaths *paths.Path, cfg lifecycle.RawConfig) (ClientHandler, error) { mgmt, err := lifecycle.NewFileClientHandler(client, info, cfg) if err != nil { return nil, fmt.Errorf("error creating client handler: %w", err) } - return NewClientHandler(mgmt, template.NewFileLoader(client, mgmt.Mode() == lifecycle.DSL, info.Logger)), nil + return NewClientHandler(mgmt, template.NewFileLoader(client, mgmt.Mode() == lifecycle.DSL, info.Logger, beatPaths)), nil } diff --git a/libbeat/otelbeat/oteltest/oteltest.go b/libbeat/otelbeat/oteltest/oteltest.go index d7e866b2782c..b5afdfcb8d69 100644 --- a/libbeat/otelbeat/oteltest/oteltest.go +++ b/libbeat/otelbeat/oteltest/oteltest.go @@ -87,7 +87,8 @@ type CheckReceiversParams struct { // The function is called periodically until the assertions are met or the timeout is reached. AssertFunc func(t *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) - Status ExpectedStatus + Status ExpectedStatus + NumRestarts int } // CheckReceivers creates receivers using the provided configuration. @@ -99,70 +100,6 @@ func CheckReceivers(params CheckReceiversParams) { var logsMu sync.Mutex logs := make(map[string][]mapstr.M) - - host := &MockHost{} - - zapCore := zapcore.NewCore( - zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), - &zaptest.Discarder{}, - zapcore.DebugLevel, - ) - observed, zapLogs := observer.New(zapcore.DebugLevel) - - core := zapcore.NewTee(zapCore, observed) - - createReceiver := func(t *testing.T, rc ReceiverConfig) receiver.Logs { - t.Helper() - - require.NotEmpty(t, rc.Name, "receiver name must not be empty") - require.NotEmpty(t, rc.Beat, "receiver beat must not be empty") - - var receiverSettings receiver.Settings - receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) - - // Replicate the behavior of the collector logger - receiverCore := core. - With([]zapcore.Field{ - zap.String("otelcol.component.id", receiverSettings.ID.String()), - zap.String("otelcol.component.kind", "receiver"), - zap.String("otelcol.signal", "logs"), - }) - - receiverSettings.Logger = zap.New(receiverCore) - - logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { - for _, rl := range ld.ResourceLogs().All() { - for _, sl := range rl.ScopeLogs().All() { - for _, lr := range sl.LogRecords().All() { - logsMu.Lock() - logs[rc.Name] = append(logs[rc.Name], lr.Body().Map().AsRaw()) - logsMu.Unlock() - } - } - } - return nil - }) - assert.NoErrorf(t, err, "Error creating log consumer for %q", rc.Name) - - r, err := rc.Factory.CreateLogs(ctx, receiverSettings, rc.Config, logConsumer) - assert.NoErrorf(t, err, "Error creating receiver %q", rc.Name) - return r - } - - // Replicate the collector behavior to instantiate components first and then start them. - var receivers []receiver.Logs - for _, rec := range params.Receivers { - receivers = append(receivers, createReceiver(t, rec)) - } - - for i, r := range receivers { - err := r.Start(ctx, host) - require.NoErrorf(t, err, "Error starting receiver %d", i) - defer func() { - require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i) - }() - } - t.Cleanup(func() { if t.Failed() { logsMu.Lock() @@ -170,50 +107,116 @@ func CheckReceivers(params CheckReceiversParams) { t.Logf("Ingested Logs: %v", logs) } }) + for range params.NumRestarts + 1 { + host := &MockHost{} + + zapCore := zapcore.NewCore( + zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), + &zaptest.Discarder{}, + zapcore.DebugLevel, + ) + observed, zapLogs := observer.New(zapcore.DebugLevel) + + core := zapcore.NewTee(zapCore, observed) + + createReceiver := func(t *testing.T, rc ReceiverConfig) receiver.Logs { + t.Helper() + + require.NotEmpty(t, rc.Name, "receiver name must not be empty") + require.NotEmpty(t, rc.Beat, "receiver beat must not be empty") + + var receiverSettings receiver.Settings + receiverSettings.ID = component.NewIDWithName(rc.Factory.Type(), rc.Name) + + // Replicate the behavior of the collector logger + receiverCore := core. + With([]zapcore.Field{ + zap.String("otelcol.component.id", receiverSettings.ID.String()), + zap.String("otelcol.component.kind", "receiver"), + zap.String("otelcol.signal", "logs"), + }) + + receiverSettings.Logger = zap.New(receiverCore) + + logConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { + for _, rl := range ld.ResourceLogs().All() { + for _, sl := range rl.ScopeLogs().All() { + for _, lr := range sl.LogRecords().All() { + logsMu.Lock() + logs[rc.Name] = append(logs[rc.Name], lr.Body().Map().AsRaw()) + logsMu.Unlock() + } + } + } + return nil + }) + assert.NoErrorf(t, err, "Error creating log consumer for %q", rc.Name) - beatForCompName := func(compName string) string { - for _, rec := range params.Receivers { - if rec.Name == compName { - return rec.Beat - } + r, err := rc.Factory.CreateLogs(ctx, receiverSettings, rc.Config, logConsumer) + assert.NoErrorf(t, err, "Error creating receiver %q", rc.Name) + return r } - return "" - } + // Replicate the collector behavior to instantiate components first and then start them. + // use a map so the order of startup and shutdown is random + receivers := make(map[int]receiver.Logs) - require.EventuallyWithT(t, func(ct *assert.CollectT) { - logsMu.Lock() - defer logsMu.Unlock() - - // Ensure the logger fields from the otel collector are present - for _, zl := range zapLogs.All() { - require.Contains(ct, zl.ContextMap(), "otelcol.component.kind") - require.Equal(ct, "receiver", zl.ContextMap()["otelcol.component.kind"]) - require.Contains(ct, zl.ContextMap(), "otelcol.signal") - require.Equal(ct, "logs", zl.ContextMap()["otelcol.signal"]) - require.Contains(ct, zl.ContextMap(), "otelcol.component.id") - compID, ok := zl.ContextMap()["otelcol.component.id"].(string) - require.True(ct, ok, "otelcol.component.id should be a string") - compName := strings.Split(compID, "/")[1] - require.Contains(ct, zl.ContextMap(), "service.name") - require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"]) - break + for i, rec := range params.Receivers { + receivers[i] = createReceiver(t, rec) } - require.NotNilf(ct, host.getEvent(), "expected not nil nil, got %v", host.Evt) - - if params.Status.Error == "" { - require.Equalf(ct, host.Evt.Status(), componentstatus.StatusOK, "expected %v, got %v", params.Status.Status, host.Evt.Status()) - require.Nilf(ct, host.Evt.Err(), "expected nil, got %v", host.Evt.Err()) - } else { - require.Equalf(ct, host.Evt.Status(), params.Status.Status, "expected %v, got %v", params.Status.Status, host.Evt.Status()) - require.ErrorContainsf(ct, host.Evt.Err(), params.Status.Error, "expected error to contain '%v': %v", params.Status.Error, host.Evt.Err()) + + for i, r := range receivers { + err := r.Start(ctx, host) + require.NoErrorf(t, err, "Error starting receiver %d", i) } - if params.AssertFunc != nil { - params.AssertFunc(ct, logs, zapLogs) + beatForCompName := func(compName string) string { + for _, rec := range params.Receivers { + if rec.Name == compName { + return rec.Beat + } + } + + return "" } - }, 2*time.Minute, 100*time.Millisecond, - "timeout waiting for logger fields from the OTel collector are present in the logs and other assertions to be met") + + require.EventuallyWithT(t, func(ct *assert.CollectT) { + logsMu.Lock() + defer logsMu.Unlock() + + // Ensure the logger fields from the otel collector are present + for _, zl := range zapLogs.All() { + require.Contains(ct, zl.ContextMap(), "otelcol.component.kind") + require.Equal(ct, "receiver", zl.ContextMap()["otelcol.component.kind"]) + require.Contains(ct, zl.ContextMap(), "otelcol.signal") + require.Equal(ct, "logs", zl.ContextMap()["otelcol.signal"]) + require.Contains(ct, zl.ContextMap(), "otelcol.component.id") + compID, ok := zl.ContextMap()["otelcol.component.id"].(string) + require.True(ct, ok, "otelcol.component.id should be a string") + compName := strings.Split(compID, "/")[1] + require.Contains(ct, zl.ContextMap(), "service.name") + require.Equal(ct, beatForCompName(compName), zl.ContextMap()["service.name"]) + break + } + require.NotNil(ct, host.Evt, "expected not nil, got nil") + + if params.Status.Error == "" { + require.Equalf(ct, host.Evt.Status(), componentstatus.StatusOK, "expected %v, got %v", params.Status.Status, host.Evt.Status()) + require.Nilf(ct, host.Evt.Err(), "expected nil, got %v", host.Evt.Err()) + } else { + require.Equalf(ct, host.Evt.Status(), params.Status.Status, "expected %v, got %v", params.Status.Status, host.Evt.Status()) + require.ErrorContainsf(ct, host.Evt.Err(), params.Status.Error, "expected error to contain '%v': %v", params.Status.Error, host.Evt.Err()) + } + + if params.AssertFunc != nil { + params.AssertFunc(ct, logs, zapLogs) + } + }, 2*time.Minute, 1*time.Second, + "timeout waiting for logger fields from the OTel collector are present in the logs and other assertions to be met") + for i, r := range receivers { + require.NoErrorf(t, r.Shutdown(ctx), "Error shutting down receiver %d", i) + } + } } // VerifyNoLeaks fails the test if any goroutines are leaked during the test. @@ -280,7 +283,6 @@ func (m *mockDiagExtension) RegisterDiagnosticHook(name string, description stri } func TestReceiverHook(t *testing.T, config component.Config, factory receiver.Factory, set receiver.Settings, expectedHooks int) { - logs, err := factory.CreateLogs(context.Background(), set, config, consumertest.NewNop()) diagExt := &mockDiagExtension{ hooks: make(map[string][]hook), diff --git a/libbeat/template/load.go b/libbeat/template/load.go index 7e5f32fa97d4..2f2598029e2c 100644 --- a/libbeat/template/load.go +++ b/libbeat/template/load.go @@ -73,33 +73,35 @@ type StatusError struct { type templateBuilder struct { log *logp.Logger isServerless bool + beatPaths *paths.Path } // NewESLoader creates a new template loader for ES -func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler, logger *logp.Logger) (*ESLoader, error) { +func NewESLoader(client ESClient, lifecycleClient lifecycle.ClientHandler, logger *logp.Logger, beatPaths *paths.Path) (*ESLoader, error) { if client == nil { return nil, errors.New("can not load template without active Elasticsearch client") } - return &ESLoader{client: client, lifecycleClient: lifecycleClient, - builder: newTemplateBuilder(client.IsServerless(), logger), log: logger.Named("template_loader")}, nil + return &ESLoader{ + client: client, lifecycleClient: lifecycleClient, + builder: newTemplateBuilder(client.IsServerless(), logger, beatPaths), log: logger.Named("template_loader"), + }, nil } // NewFileLoader creates a new template loader for the given file. -func NewFileLoader(c FileClient, isServerless bool, logger *logp.Logger) *FileLoader { +func NewFileLoader(c FileClient, isServerless bool, logger *logp.Logger, beatPaths *paths.Path) *FileLoader { // other components of the file loader will fail if both ILM and DSL are set, // so at this point it's fairly safe to just pass cfg.DSL.Enabled - return &FileLoader{client: c, builder: newTemplateBuilder(isServerless, logger), log: logger.Named("file_template_loader")} + return &FileLoader{client: c, builder: newTemplateBuilder(isServerless, logger, beatPaths), log: logger.Named("file_template_loader")} } -func newTemplateBuilder(serverlessMode bool, logger *logp.Logger) *templateBuilder { - return &templateBuilder{log: logger.Named("template"), isServerless: serverlessMode} +func newTemplateBuilder(serverlessMode bool, logger *logp.Logger, beatPaths *paths.Path) *templateBuilder { + return &templateBuilder{log: logger.Named("template"), isServerless: serverlessMode, beatPaths: beatPaths} } // Load checks if the index mapping template should be loaded. // In case the template is not already loaded or overwriting is enabled, the // template is built and written to index. func (l *ESLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error { - // build template from config tmpl, err := l.builder.template(config, info, l.client.GetVersion(), migration) if err != nil || tmpl == nil { @@ -179,7 +181,7 @@ func (l *ESLoader) loadTemplate(templateName string, template map[string]interfa if err != nil { return fmt.Errorf("couldn't load template: %w. Response body: %s", err, body) } - if status > http.StatusMultipleChoices { //http status 300 + if status > http.StatusMultipleChoices { // http status 300 return fmt.Errorf("couldn't load json. Status: %v", status) } return nil @@ -226,13 +228,13 @@ func (l *ESLoader) checkExistsTemplate(name string) (bool, error) { // Load reads the template from the config, creates the template body and prints it to the configured file. func (l *FileLoader) Load(config TemplateConfig, info beat.Info, fields []byte, migration bool) error { - //build template from config + // build template from config tmpl, err := l.builder.template(config, info, l.client.GetVersion(), migration) if err != nil || tmpl == nil { return err } - //create body to print + // create body to print body, err := l.builder.buildBody(tmpl, config, fields) if err != nil { return err @@ -276,7 +278,7 @@ func (b *templateBuilder) buildBody(tmpl *Template, config TemplateConfig, field } func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (mapstr.M, error) { - jsonPath := paths.Resolve(paths.Config, config.JSON.Path) + jsonPath := b.beatPaths.Resolve(paths.Config, config.JSON.Path) if _, err := os.Stat(jsonPath); err != nil { return nil, fmt.Errorf("error checking json file %s for template: %w", jsonPath, err) } @@ -284,7 +286,6 @@ func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (mapstr.M, er content, err := os.ReadFile(jsonPath) if err != nil { return nil, fmt.Errorf("error reading file %s for template: %w", jsonPath, err) - } var body map[string]interface{} err = json.Unmarshal(content, &body) @@ -296,7 +297,7 @@ func (b *templateBuilder) buildBodyFromJSON(config TemplateConfig) (mapstr.M, er func (b *templateBuilder) buildBodyFromFile(tmpl *Template, config TemplateConfig) (mapstr.M, error) { b.log.Debugf("Load fields.yml from file: %s", config.Fields) - fieldsPath := paths.Resolve(paths.Config, config.Fields) + fieldsPath := b.beatPaths.Resolve(paths.Config, config.Fields) body, err := tmpl.LoadFile(fieldsPath) if err != nil { return nil, fmt.Errorf("error creating template from file %s: %w", fieldsPath, err) diff --git a/libbeat/template/load_integration_test.go b/libbeat/template/load_integration_test.go index 39cde51e6ff8..740d6d519e5a 100644 --- a/libbeat/template/load_integration_test.go +++ b/libbeat/template/load_integration_test.go @@ -43,6 +43,7 @@ import ( "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) @@ -71,7 +72,7 @@ func newTestSetup(t *testing.T, cfg TemplateConfig) *testSetup { } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} logger := logptest.NewTestingLogger(t, "") - loader, err := NewESLoader(client, handler, logger) + loader, err := NewESLoader(client, handler, logger, paths.New()) require.NoError(t, err) s := testSetup{t: t, client: client, loader: loader, config: cfg} // don't care if the cleanup fails, since they might just return a 404 @@ -89,7 +90,7 @@ func newTestSetupWithESClient(t *testing.T, client ESClient, cfg TemplateConfig) } handler := &mockClientHandler{serverless: false, mode: lifecycle.ILM} logger := logptest.NewTestingLogger(t, "") - loader, err := NewESLoader(client, handler, logger) + loader, err := NewESLoader(client, handler, logger, paths.New()) require.NoError(t, err) return &testSetup{t: t, client: client, loader: loader, config: cfg} } @@ -482,7 +483,6 @@ func TestTemplateWithData(t *testing.T) { _, _, err := esClient.Index(setup.config.Name, "_doc", "", nil, test.data) if test.error { assert.Error(t, err) - } else { assert.NoError(t, err) } diff --git a/libbeat/template/load_test.go b/libbeat/template/load_test.go index 8fb59486d7f9..f470761fbcd2 100644 --- a/libbeat/template/load_test.go +++ b/libbeat/template/load_test.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp/logptest" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" "github.com/elastic/elastic-agent-libs/version" "github.com/stretchr/testify/assert" @@ -36,6 +37,10 @@ func TestFileLoader_Load(t *testing.T) { prefix := "mock" info := beat.Info{Beat: "mock", Version: ver, IndexPrefix: prefix} tmplName := fmt.Sprintf("%s-%s", prefix, ver) + beatPaths := paths.New() + beatPaths.Home = t.TempDir() + err := beatPaths.InitPaths(beatPaths) + require.NoError(t, err) for name, test := range map[string]struct { settings TemplateSettings @@ -52,7 +57,8 @@ func TestFileLoader_Load(t *testing.T) { "data_stream": struct{}{}, "priority": 150, "template": mapstr.M{ - "settings": mapstr.M{"index": nil}}, + "settings": mapstr.M{"index": nil}, + }, }, }, "load minimal config info serverless": { @@ -62,7 +68,8 @@ func TestFileLoader_Load(t *testing.T) { "data_stream": struct{}{}, "priority": 150, "template": mapstr.M{ - "settings": mapstr.M{"index": nil}}, + "settings": mapstr.M{"index": nil}, + }, }, }, "load minimal config with index settings": { @@ -73,7 +80,8 @@ func TestFileLoader_Load(t *testing.T) { "data_stream": struct{}{}, "priority": 150, "template": mapstr.M{ - "settings": mapstr.M{"index": mapstr.M{"code": "best_compression"}}}, + "settings": mapstr.M{"index": mapstr.M{"code": "best_compression"}}, + }, }, }, "load minimal config with source settings": { @@ -91,7 +99,8 @@ func TestFileLoader_Load(t *testing.T) { "date_detection": false, "dynamic_templates": nil, "properties": nil, - }}, + }, + }, }, }, "load config and in-line analyzer fields": { @@ -101,7 +110,8 @@ func TestFileLoader_Load(t *testing.T) { "data_stream": struct{}{}, "priority": 150, "template": mapstr.M{ - "settings": mapstr.M{"index": nil}}, + "settings": mapstr.M{"index": nil}, + }, }, fields: []byte(`- key: test title: Test fields.yml with analyzer @@ -228,7 +238,7 @@ func TestFileLoader_Load(t *testing.T) { t.Run(name, func(t *testing.T) { fc := newFileClient(ver) logger := logptest.NewTestingLogger(t, "") - fl := NewFileLoader(fc, test.isServerless, logger) + fl := NewFileLoader(fc, test.isServerless, logger, beatPaths) cfg := DefaultConfig(info) cfg.Settings = test.settings diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 5f3460643bf6..909d32e72c9f 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -287,7 +287,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { - moduleReloader := cfgfile.NewReloader(bt.logger.Named("module.reload"), b.Publisher, bt.config.ConfigModules) + moduleReloader := cfgfile.NewReloader(bt.logger.Named("module.reload"), b.Publisher, bt.config.ConfigModules, b.Paths) if err := moduleReloader.Check(factory); err != nil { return err @@ -329,7 +329,6 @@ func (bt *Metricbeat) WithOtelFactoryWrapper(wrapper cfgfile.FactoryWrapper) { // result in undefined behavior. func (bt *Metricbeat) Stop() { bt.stopOnce.Do(func() { close(bt.done) }) - } // Modules return a list of all configured modules. diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index cbcf24baf4b6..e6751df89a72 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -162,13 +162,12 @@ func benchmarkFactoryWithLogLevel(b *testing.B, level zapcore.Level) { } func TestMultipleReceivers(t *testing.T) { - t.Skip("flaky test, see https://github.com/elastic/beats/issues/43832") // This test verifies that multiple receivers can be instantiated // in isolation, started, and can ingest logs without interfering // with each other. // Receivers need distinct home directories so wrap the config in a function. - config := func(monitorSocket string) *Config { + config := func(monitorSocket string, homePath string, ingestPath string) *Config { var monitorHost string if runtime.GOOS == "windows" { monitorHost = "npipe:///" + filepath.Base(monitorSocket) @@ -186,10 +185,11 @@ func TestMultipleReceivers(t *testing.T) { "count": 1, }, { - "type": "filestream", - "enabled": true, - "id": "must-be-unique", - "paths": []string{"none"}, + "type": "filestream", + "enabled": true, + "id": "must-be-unique", + "paths": []string{ingestPath}, + "file_identity.native": nil, }, }, }, @@ -199,7 +199,7 @@ func TestMultipleReceivers(t *testing.T) { "*", }, }, - "path.home": t.TempDir(), + "path.home": homePath, "http.enabled": true, "http.host": monitorHost, }, @@ -209,23 +209,40 @@ func TestMultipleReceivers(t *testing.T) { factory := NewFactory() monitorSocket1 := genSocketPath() monitorSocket2 := genSocketPath() + dir1 := t.TempDir() + dir2 := t.TempDir() + ingest1 := filepath.Join(t.TempDir(), "test1.log") + ingest2 := filepath.Join(t.TempDir(), "test2.log") oteltest.CheckReceivers(oteltest.CheckReceiversParams{ - T: t, + T: t, + NumRestarts: 5, Receivers: []oteltest.ReceiverConfig{ { Name: "r1", - Config: config(monitorSocket1), Beat: "filebeat", + Config: config(monitorSocket1, dir1, ingest1), Factory: factory, }, { Name: "r2", - Config: config(monitorSocket2), Beat: "filebeat", + Config: config(monitorSocket2, dir2, ingest2), Factory: factory, }, }, AssertFunc: func(c *assert.CollectT, logs map[string][]mapstr.M, zapLogs *observer.ObservedLogs) { + // Add data to be ingested with filestream + f1, err := os.OpenFile(ingest1, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + require.NoError(c, err) + _, err = f1.WriteString("A log line\n") + require.NoError(c, err) + f1.Close() + f2, err := os.OpenFile(ingest2, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644) + require.NoError(c, err) + _, err = f2.WriteString("A log line\n") + require.NoError(c, err) + f2.Close() + require.Greater(c, len(logs["r1"]), 0, "receiver r1 does not have any logs") require.Greater(c, len(logs["r2"]), 0, "receiver r2 does not have any logs") @@ -238,10 +255,23 @@ func TestMultipleReceivers(t *testing.T) { // instance and does not interfere with others. Previously, the // logger in Beats was global, causing logger fields to be // overwritten when multiple receivers started in the same process. - r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "r1")) + r1StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r1")) assert.Equal(c, 1, r1StartLogs.Len(), "r1 should have a single start log") - r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "r2")) + r2StartLogs := zapLogs.FilterMessageSnippet("Beat ID").FilterField(zap.String("otelcol.component.id", "filebeatreceiver/r2")) assert.Equal(c, 1, r2StartLogs.Len(), "r2 should have a single start log") + + meta1Path := filepath.Join(dir1, "/data/meta.json") + assert.FileExists(c, meta1Path, "dir1/data/meta.json should exist") + meta1Data, err := os.ReadFile(meta1Path) + assert.NoError(c, err) + + meta2Path := filepath.Join(dir2, "/data/meta.json") + assert.FileExists(c, meta2Path, "dir2/data/meta.json should exist") + meta2Data, err := os.ReadFile(meta2Path) + assert.NoError(c, err) + + assert.NotEqual(c, meta1Data, meta2Data, "meta data files should be different") + var lastError strings.Builder assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket1, "stats") @@ -255,6 +285,25 @@ func TestMultipleReceivers(t *testing.T) { assert.Conditionf(c, func() bool { return getFromSocket(t, &lastError, monitorSocket2, "inputs") }, "failed to connect to monitoring socket2, inputs endpoint, last error was: %s", &lastError) + + ingest1Json, err := json.Marshal(ingest1) + require.NoError(c, err) + ingest2Json, err := json.Marshal(ingest2) + require.NoError(c, err) + + reg1Path := filepath.Join(dir1, "/data/registry/filebeat/log.json") + require.FileExists(c, reg1Path, "receiver 1 filebeat registry should exist") + reg1Data, err := os.ReadFile(reg1Path) + require.NoError(c, err) + require.Containsf(c, string(reg1Data), string(ingest1Json), "receiver 1 registry should contain '%s', but was: %s", string(ingest1Json), string(reg1Data)) + require.NotContainsf(c, string(reg1Data), string(ingest2Json), "receiver 1 registry should not contain '%s', but was: %s", string(ingest2Json), string(reg1Data)) + + reg2Path := filepath.Join(dir2, "/data/registry/filebeat/log.json") + require.FileExists(c, reg2Path, "receiver 2 filebeat registry should exist") + reg2Data, err := os.ReadFile(reg2Path) + require.NoError(c, err) + require.Containsf(c, string(reg2Data), string(ingest2Json), "receiver 2 registry should contain '%s', but was: %s", string(ingest2Json), string(reg2Data)) + require.NotContainsf(c, string(reg2Data), string(ingest1Json), "receiver 2 registry should not contain '%s', but was: %s", string(ingest1Json), string(reg2Data)) }, }) } diff --git a/x-pack/libbeat/cmd/instance/beat.go b/x-pack/libbeat/cmd/instance/beat.go index 6f459e9ef5a2..b3ba6667df66 100644 --- a/x-pack/libbeat/cmd/instance/beat.go +++ b/x-pack/libbeat/cmd/instance/beat.go @@ -89,13 +89,29 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an } cfg := (*config.C)(tmp) - if err := instance.InitPaths(cfg); err != nil { - return nil, fmt.Errorf("error initializing paths: %w", err) + if settings.Name == "filebeat" { + partialConfig := struct { + Path paths.Path `config:"path"` + }{} + + if err := cfg.Unpack(&partialConfig); err != nil { + return nil, fmt.Errorf("error extracting default paths: %w", err) + } + p := paths.New() + if err := p.InitPaths(&partialConfig.Path); err != nil { + return nil, fmt.Errorf("error initializing default paths: %w", err) + } + b.Paths = p + } else { + if err := instance.InitPaths(cfg); err != nil { + return nil, fmt.Errorf("error initializing paths: %w", err) + } + b.Paths = paths.Paths } // We have to initialize the keystore before any unpack or merging the cloud // options. - store, err := instance.LoadKeystore(cfg, b.Info.Beat) + store, err := instance.LoadKeystore(cfg, b.Info.Beat, b.Paths) if err != nil { return nil, fmt.Errorf("could not initialize the keystore: %w", err) } @@ -153,9 +169,9 @@ func NewBeatForReceiver(settings instance.Settings, receiverConfig map[string]an } // log paths values to help with troubleshooting - logger.Infof("%s", paths.Paths.String()) + logger.Infof("%s", b.Paths.String()) - metaPath := paths.Resolve(paths.Data, "meta.json") + metaPath := b.Paths.Resolve(paths.Data, "meta.json") err = b.LoadMeta(metaPath) if err != nil { return nil, fmt.Errorf("error loading meta data: %w", err)