Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions filebeat/autodiscover/builder/hints/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

const (
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewLogHints(cfg *conf.C, logger *logp.Logger) (autodiscover.Builder, error)
return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err)
}

moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{})
moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{Logger: logger}, false, fileset.FilesetOverrides{}, paths.Paths)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -155,7 +156,6 @@ func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*conf
} else {
shouldPut(tempCfg, json, jsonOpts, l.log)
}

}
// Merge config template with the configs from the annotations
// AppendValues option is used to append arrays from annotations to existing arrays while merging
Expand Down
9 changes: 6 additions & 3 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
)

type crawler struct {
Expand All @@ -41,6 +42,7 @@ type crawler struct {
inputReloader *cfgfile.Reloader
once bool
beatDone chan struct{}
beatPaths *paths.Path
}

func newCrawler(
Expand All @@ -49,6 +51,7 @@ func newCrawler(
beatDone chan struct{},
once bool,
logger *logp.Logger,
beatPaths *paths.Path,
) (*crawler, error) {
return &crawler{
log: logger.Named("crawler"),
Expand All @@ -58,6 +61,7 @@ func newCrawler(
inputConfigs: inputConfigs,
once: once,
beatDone: beatDone,
beatPaths: beatPaths,
}, nil
}

Expand All @@ -80,14 +84,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(log, pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs, c.beatPaths)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(log, pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules, c.beatPaths)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand All @@ -113,7 +117,6 @@ func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *conf.C,
) error {

if !config.Enabled() {
c.log.Infof("input disabled, skipping it")
return nil
Expand Down
16 changes: 8 additions & 8 deletions filebeat/beater/diagnostics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {

registryFileRegExps := []*regexp.Regexp{}
preFilesList := [][]string{
[]string{"^registry$"},
[]string{"^registry", "filebeat$"},
[]string{"^registry", "filebeat", "meta\\.json$"},
[]string{"^registry", "filebeat", "log\\.json$"},
[]string{"^registry", "filebeat", "active\\.dat$"},
[]string{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
{"^registry$"},
{"^registry", "filebeat$"},
{"^registry", "filebeat", "meta\\.json$"},
{"^registry", "filebeat", "log\\.json$"},
{"^registry", "filebeat", "active\\.dat$"},
{"^registry", "filebeat", "[[:digit:]]*\\.json$"},
}

for _, lst := range preFilesList {
Expand All @@ -70,12 +70,12 @@ func getRegexpsForRegistryFiles() ([]*regexp.Regexp, error) {
return registryFileRegExps, nil
}

func gzipRegistry(logger *logp.Logger) func() []byte {
func gzipRegistry(logger *logp.Logger, beatPaths *paths.Path) func() []byte {
logger = logger.Named("diagnostics")

return func() []byte {
buf := bytes.Buffer{}
dataPath := paths.Resolve(paths.Data, "")
dataPath := beatPaths.Resolve(paths.Data, "")
registryPath := filepath.Join(dataPath, "registry")
f, err := os.CreateTemp("", "filebeat-registry-*.tar")
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func newBeater(b *beat.Beat, plugins PluginFactory, rawConfig *conf.C) (beat.Bea
EnableAllFilesets: enableAllFilesets,
ForceEnableModuleFilesets: forceEnableModuleFilesets,
}
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides)
moduleRegistry, err := fileset.NewModuleRegistry(config.Modules, b.Info, true, filesetOverrides, b.Paths)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
ForceEnableModuleFilesets: forceEnableModuleFilesets,
}

modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides)
modulesFactory := fileset.NewSetupFactory(b.Info, pipelineLoaderFactory, filesetOverrides, b.Paths)
if fb.config.ConfigModules.Enabled() {
if enableAllFilesets {
// All module configs need to be loaded to enable all the filesets
Expand All @@ -207,7 +207,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(fb.logger.Named("module.reloader"), fb.pipeline, fb.config.ConfigModules, b.Paths)
modulesLoader.Load(modulesFactory)
}

Expand Down Expand Up @@ -266,7 +266,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
"Filebeat's registry",
"registry.tar.gz",
"application/octet-stream",
gzipRegistry(b.Info.Logger))
gzipRegistry(b.Info.Logger, b.Paths))
}

if !fb.moduleRegistry.Empty() {
Expand All @@ -288,7 +288,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
}
finishedLogger := newFinishedLogger(wgEvents)

registryMigrator := registrar.NewMigrator(config.Registry, fb.logger)
registryMigrator := registrar.NewMigrator(config.Registry, fb.logger, b.Paths)
if err := registryMigrator.Run(); err != nil {
fb.logger.Errorf("Failed to migrate registry file: %+v", err)
return err
Expand All @@ -301,7 +301,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
cn()
}()

stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry)
stateStore, err := openStateStore(ctx, b.Info, fb.logger.Named("filebeat"), config.Registry, b.Paths)
if err != nil {
fb.logger.Errorf("Failed to open state store: %+v", err)
return err
Expand Down Expand Up @@ -361,7 +361,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
pipelineConnector := channel.NewOutletFactory(outDone).Create

inputsLogger := fb.logger.Named("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, paths.Paths)
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore, b.Paths)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
if err != nil {
panic(err) // loader detected invalid state.
Expand Down Expand Up @@ -403,8 +403,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
fb.logger.Warn(pipelinesWarning)
}
}
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger)
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines, b.Paths)
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once, fb.logger, b.Paths)
if err != nil {
fb.logger.Errorf("Could not init crawler: %v", err)
return err
Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type filebeatStore struct {
notifier *es.Notifier
}

func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry, beatPaths *paths.Path) (*filebeatStore, error) {
var (
reg backend.Registry
err error
Expand All @@ -61,7 +61,7 @@ func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cf
}

reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
Root: beatPaths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/fileset/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ func NewFilesetConfig(cfg *conf.C) (*FilesetConfig, error) {
// mergePathDefaults returns a copy of c containing the path variables that must
// be available for variable expansion in module configuration (e.g. it enables
// the use of ${path.config} in module config).
func mergePathDefaults(c *conf.C) (*conf.C, error) {
func mergePathDefaults(c *conf.C, beatPaths *paths.Path) (*conf.C, error) {
defaults := conf.MustNewConfigFrom(map[string]interface{}{
"path": map[string]interface{}{
"home": paths.Paths.Home,
"home": beatPaths.Home,
"config": "${path.home}",
"data": filepath.Join("${path.home}", "data"),
"logs": filepath.Join("${path.home}", "logs"),
Expand Down
12 changes: 9 additions & 3 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/paths"
)

var moduleList = monitoring.NewUniqueList()
var moduleListMetricsOnce sync.Once
var (
moduleList = monitoring.NewUniqueList()
moduleListMetricsOnce sync.Once
)

// RegisterMonitoringModules registers the modules list with the monitoring system.
func RegisterMonitoringModules(namespace string) {
Expand All @@ -50,6 +53,7 @@ type Factory struct {
overwritePipelines bool
pipelineCallbackID uuid.UUID
inputFactory cfgfile.RunnerFactory
beatPaths *paths.Path
}

// Wrap an array of inputs and implements cfgfile.Runner interface
Expand All @@ -69,13 +73,15 @@ func NewFactory(
beatInfo beat.Info,
pipelineLoaderFactory PipelineLoaderFactory,
overwritePipelines bool,
beatPaths *paths.Path,
) *Factory {
return &Factory{
inputFactory: inputFactory,
beatInfo: beatInfo,
pipelineLoaderFactory: pipelineLoaderFactory,
pipelineCallbackID: uuid.Nil,
overwritePipelines: overwritePipelines,
beatPaths: beatPaths,
}
}

Expand Down Expand Up @@ -134,7 +140,7 @@ func (f *Factory) CheckConfig(c *conf.C) error {
// createRegistry starts a registry for a set of filesets, it returns the registry and
// its input configurations
func (f *Factory) createRegistry(c *conf.C) (*ModuleRegistry, []*conf.C, error) {
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{})
m, err := NewModuleRegistry([]*conf.C{c}, f.beatInfo, false, FilesetOverrides{}, f.beatPaths)
if err != nil {
return nil, nil, err
}
Expand Down
43 changes: 25 additions & 18 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
"github.com/elastic/elastic-agent-libs/version"
)

Expand All @@ -53,6 +54,7 @@ type Fileset struct {
vars map[string]interface{}
pipelineIDs []string
logger *logp.Logger
beatPaths *paths.Path
}

type pipeline struct {
Expand All @@ -67,6 +69,7 @@ func New(
mname string,
fcfg *FilesetConfig,
logger *logp.Logger,
beatPaths *paths.Path,
) (*Fileset, error,
) {
modulePath := filepath.Join(modulesPath, mname)
Expand All @@ -80,6 +83,7 @@ func New(
fcfg: fcfg,
modulePath: modulePath,
logger: logger,
beatPaths: beatPaths,
}, nil
}

Expand Down Expand Up @@ -168,7 +172,7 @@ func (fs *Fileset) evaluateVars(info beat.Info) (map[string]interface{}, error)
var exists bool
name, exists := vals["name"].(string)
if !exists {
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
return nil, fmt.Errorf("variable doesn't have a string 'name' key")
}

// Variables are not required to have a default. Templates should
Expand Down Expand Up @@ -207,28 +211,31 @@ func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersi
}

if !esVersion.IsValid() {
return vars, errors.New("Unknown Elasticsearch version")
return vars, errors.New("unknown Elasticsearch version")
}

for _, vals := range fs.manifest.Vars {
var ok bool
name, ok := vals["name"].(string)
if !ok {
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
return nil, fmt.Errorf("variable doesn't have a string 'name' key")
}

minESVersion, ok := vals["min_elasticsearch_version"].(map[string]interface{})
if ok {
minVersion, err := version.New(minESVersion["version"].(string))
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %w", minESVersion["version"].(string), err)
}
versionString, ok := minESVersion["version"].(string)
if ok {
minVersion, err := version.New(versionString)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %w", versionString, err)
}

fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)
fs.logger.Named("fileset").Debugf("Comparing ES version %s with requirement of %s", esVersion.String(), minVersion)

if esVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
if esVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
fs.logger.Infof("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], esVersion.String())
}
}
}
}
Expand Down Expand Up @@ -312,11 +319,11 @@ func getTemplateFunctions(vars map[string]interface{}) (template.FuncMap, error)
},
"IngestPipeline": func(shortID string) string {
return FormatPipelineID(
builtinVars["prefix"].(string),
builtinVars["module"].(string),
builtinVars["fileset"].(string),
builtinVars["prefix"].(string), //nolint:errcheck //keep behavior for now
builtinVars["module"].(string), //nolint:errcheck //keep behavior for now
builtinVars["fileset"].(string), //nolint:errcheck //keep behavior for now
shortID,
builtinVars["beatVersion"].(string),
builtinVars["beatVersion"].(string), //nolint:errcheck //keep behavior for now
)
},
}, nil
Expand Down Expand Up @@ -368,7 +375,7 @@ func (fs *Fileset) getInputConfig() (*conf.C, error) {
return nil, fmt.Errorf("Error reading input config: %w", err)
}

cfg, err = mergePathDefaults(cfg)
cfg, err = mergePathDefaults(cfg, fs.beatPaths)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -461,15 +468,15 @@ func (fs *Fileset) GetPipelines(esVersion version.V) (pipelines []pipeline, err
}
newContent, err := FixYAMLMaps(content)
if err != nil {
return nil, fmt.Errorf("Failed to sanitize the YAML pipeline file: %s: %w", path, err)
return nil, fmt.Errorf("failed to sanitize the YAML pipeline file: %s: %w", path, err)
}
var ok bool
content, ok = newContent.(map[string]interface{})
if !ok {
return nil, errors.New("cannot convert newContent to map[string]interface{}")
}
default:
return nil, fmt.Errorf("Unsupported extension '%s' for pipeline file: %s", extension, path)
return nil, fmt.Errorf("unsupported extension '%s' for pipeline file: %s", extension, path)
}

pipelineID := fs.pipelineIDs[idx]
Expand Down
Loading
Loading