Skip to content

Conversation

@orestisfl
Copy link
Contributor

Proposed commit message

When multiple inputs connect to the same pipeline, each input calls SetPaths on global processors. Previously, the second call would fail with 'attempt to set paths twice'.

This fix makes SetPaths idempotent by storing the paths pointer and returning nil (instead of an error) when called again with the same pointer. This is the expected behavior for global processors shared across multiple inputs within the same beat instance.

Changes:

  • libbeat/processors/safe_processor.go: Store paths pointer and allow subsequent calls with the same pointer to succeed
  • libbeat/processors/safe_processor_test.go: Add test cases for idempotent SetPaths behavior
  • filebeat/tests/integration/cache_processor_test.go: Add integration test that verifies global cache processor works with multiple inputs

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the stresstest.sh script to run them under stress conditions and race detector to verify their stability. -- I did use the stresstest script to verify the new integration test
  • I have added an entry in ./changelog/fragments using the changelog tool.

How to test this PR locally

Option 1: Run the Integration Test

cd filebeat
mage buildSystemTestBinary
go test -v -tags integration -run "TestGlobalCacheProcessorMultipleInputs" ./tests/integration/

Expected result: Test passes with all 30 events processed from 3 inputs.
Without the fix: Test fails with:

"error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice"

Option 2: Manual Test with Config File

filebeat.inputs:
  - type: filestream
    id: input-a
    paths:
      - /tmp/logs/a.log
    prospector.scanner.fingerprint.enabled: false

  - type: filestream
    id: input-b
    paths:
      - /tmp/logs/b.log
    prospector.scanner.fingerprint.enabled: false

processors:
  - cache:
      backend:
        file:
          id: test-cache
          write_interval: 1s
        capacity: 1000
      put:
        key_field: message
        value_field: message
        ttl: 1h
      ignore_missing: true

output.console:
  enabled: true
mkdir -p /tmp/logs
for i in $(seq 1 100); do echo "{\"message\":\"test $i\"}"; done > /tmp/logs/a.log
for i in $(seq 1 100); do echo "{\"message\":\"test $i\"}"; done > /tmp/logs/b.log

./filebeat -e -c config_cache_global.yml 2>&1 | grep -E "error|twice|initialized|harvester.*started"

Expected result (with fix):

  • "initialized cache processor" appears once
  • Both inputs start successfully
  • Events are processed from both inputs

Without the fix:

"error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice"

Only input-a processes events; input-b fails to connect.

Related issues

Logs

Logs
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.120+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":836},"message":"Home path: [/home/orestis/src/beats/x-pack/filebeat] Config path: [/home/orestis/src/beats/x-pack/filebeat] Data path: [/home/orestis/src/beats/x-pack/filebeat/data] Logs path: [/home/orestis/src/beats/x-pack/filebeat/logs]","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.121+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":844},"message":"Beat ID: 05fc1486-d763-43fa-8b98-e38ae1398577","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.129+0100","log.logger":"processors.cache","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/processors/cache.New","file.name":"cache/cache.go","file.line":72},"message":"cache processor created","service.name":"filebeat","instance_id":1,"config":{"Get":null,"Put":{"Key":"host.name","Value":"host.metadata","TTL":86400000000000},"Delete":null,"Store":{"Memory":{"ID":"global-cache"},"File":null,"Capacity":10000,"Effort":0},"IgnoreMissing":true,"IgnoreFailure":false,"OverwriteKeys":false},"ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"seccomp","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/common/seccomp.loadFilter","file.name":"seccomp/seccomp.go","file.line":125},"message":"Syscall filter successfully installed","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).createBeater","file.name":"instance/beat.go","file.line":332},"message":"Setup Beat: filebeat; Version: 9.3.0 (FIPS-distribution: false)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1393},"message":"Beat info","service.name":"filebeat","system_info":{"beat":{"path":{"config":"/home/orestis/src/beats/x-pack/filebeat","data":"/home/orestis/src/beats/x-pack/filebeat/data","home":"/home/orestis/src/beats/x-pack/filebeat","logs":"/home/orestis/src/beats/x-pack/filebeat/logs"},"type":"filebeat","uuid":"05fc1486-d763-43fa-8b98-e38ae1398577"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1402},"message":"Build info","service.name":"filebeat","system_info":{"build":{"commit":"deec5c55e9b0306e2c3b10de6367b266b71e7899","libbeat":"9.3.0","time":"2025-12-02T13:51:01.000Z","version":"9.3.0"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1405},"message":"Go runtime info","service.name":"filebeat","system_info":{"go":{"os":"linux","arch":"amd64","max_procs":12,"version":"go1.25.4 X:nodwarf5"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1411},"message":"Host info","service.name":"filebeat","system_info":{"host":{"architecture":"x86_64","native_architecture":"x86_64","boot_time":"2025-12-02T06:11:29+01:00","containerized":false,"name":"laptop","ip":["127.0.0.1","192.168.178.44","172.18.0.1","172.17.0.1","::1","2001:a61:12f1:9901:591e:b1f2:b909:30fa","fd9a:56af:31d7:0:5b85:f561:af6a:7612","fe80::a7c6:8abd:9493:d9d8","fe80::243e:c7ff:fe14:7299"],"kernel_version":"6.12.59-1-lts","mac":["00:be:43:61:7d:c4","8c:f8:c5:c5:33:ac","9c:7b:ef:62:65:a3","1e:3a:da:25:d0:60","26:3e:c7:14:72:99"],"os":{"type":"linux","family":"arch","platform":"arch","name":"Arch Linux","version":"","major":0,"minor":0,"patch":0,"build":"rolling"},"timezone":"CET","timezone_offset_sec":3600,"id":"9fab1a953bcd47949880e26abc13a79d"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1440},"message":"Process info","service.name":"filebeat","system_info":{"process":{"capabilities":{"inheritable":["wake_alarm"],"permitted":null,"effective":null,"bounding":["chown","dac_override","dac_read_search","fowner","fsetid","kill","setgid","setuid","setpcap","linux_immutable","net_bind_service","net_broadcast","net_admin","net_raw","ipc_lock","ipc_owner","sys_module","sys_rawio","sys_chroot","sys_ptrace","sys_pacct","sys_admin","sys_boot","sys_nice","sys_resource","sys_time","sys_tty_config","mknod","lease","audit_write","audit_control","setfcap","mac_override","mac_admin","syslog","wake_alarm","block_suspend","audit_read","perfmon","bpf","checkpoint_restore"],"ambient":null},"cwd":"/home/orestis/src/beats","exe":"/home/orestis/src/beats/x-pack/filebeat/filebeat","name":"filebeat","pid":812688,"ppid":809821,"seccomp":{"mode":"filter","no_new_privs":true},"start_time":"2025-12-02T14:52:38.740+0100"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings","file.name":"pipeline/module.go","file.line":105},"message":"Beat name: laptop","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"modules","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/fileset.newModuleRegistry","file.name":"fileset/modules.go","file.line":138},"message":"Enabled modules/filesets: ","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"warn","@timestamp":"2025-12-02T14:52:39.133+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).setupPipelineLoaderCallback","file.name":"beater/filebeat.go","file.line":174},"message":"Filebeat is unable to load the ingest pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the ingest pipelines or are using Logstash pipelines, you can ignore this warning.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"monitoring","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/report/log.(*reporter).snapshotLoop","file.name":"log/log.go","file.line":150},"message":"Starting metrics logging every 30s","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).launch","file.name":"instance/beat.go","file.line":542},"message":"filebeat start running.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"filebeat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog.openStore","file.name":"memlog/store.go","file.line":134},"message":"Finished loading transaction log file for '/home/orestis/src/beats/x-pack/filebeat/data/registry/filebeat'. Active transaction id=4","service.name":"filebeat","store":"filebeat","ecs.version":"1.6.0"}
{"log.level":"warn","@timestamp":"2025-12-02T14:52:39.134+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Run","file.name":"beater/filebeat.go","file.line":406},"message":"Filebeat is unable to load the ingest pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the ingest pipelines or are using Logstash pipelines, you can ignore this warning.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"registrar","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/registrar.(*Registrar).loadStates","file.name":"registrar/registrar.go","file.line":103},"message":"States Loaded from registrar: 0","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start","file.name":"beater/crawler.go","file.line":76},"message":"Loading Inputs: 2","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput","file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 5483632775956168786)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1","file.name":"compat/compat.go","file.line":141},"message":"Input 'filestream' starting","service.name":"filebeat","id":"input-a","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream.metric_registry","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/inputmon.NewMetricsRegistry","file.name":"inputmon/input.go","file.line":182},"message":"registering","service.name":"filebeat","id":"input-a","registry_id":"input-a","input_id":"input-a","input_type":"filestream","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput","file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 1687432983113697962)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start","file.name":"beater/crawler.go","file.line":111},"message":"Loading and starting Inputs completed. Enabled inputs: 2","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1","file.name":"compat/compat.go","file.line":141},"message":"Input 'filestream' starting","service.name":"filebeat","id":"input-b","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream.metric_registry","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/inputmon.NewMetricsRegistry","file.name":"inputmon/input.go","file.line":182},"message":"registering","service.name":"filebeat","id":"input-b","registry_id":"input-b","input_id":"input-b","input_type":"filestream","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"processors.cache","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/processors/cache.(*cache).SetPaths","file.name":"cache/cache.go","file.line":197},"message":"initialized cache processor","service.name":"filebeat","instance_id":1,"details":"cache=[operation=put, store_id=memory:global-cache, key_field=host.name, value_field=host.metadata, ttl=24h0m0s, ignore_missing=true, ignore_failure=false, overwrite_fields=false]","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task.NewGroup.func2","file.name":"task/group.go","file.line":69},"message":"harvester:: error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice","service.name":"filebeat","id":"input-b","filestream_id":"input-b","ecs.version":"1.6.0"}
^C{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"service","log.origin":{"function":"github.com/elastic/elastic-agent-libs/service.HandleSignals.func1","file.name":"service/service.go","file.line":52},"message":"Received signal \"interrupt\", stopping","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Stop","file.name":"beater/filebeat.go","file.line":541},"message":"Stopping filebeat","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop","file.name":"beater/crawler.go","file.line":155},"message":"Stopping Crawler","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop","file.name":"beater/crawler.go","file.line":165},"message":"Stopping 2 inputs","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop.func2","file.name":"beater/crawler.go","file.line":170},"message":"Stopping input: 1687432983113697962","service.name":"filebeat","ecs.version":"1.6.0"}

When multiple inputs connect to the same pipeline, each input calls
SetPaths on global processors. Previously, the second call would fail
with 'attempt to set paths twice'.

This fix makes SetPaths idempotent by storing the paths pointer and
returning nil (instead of an error) when called again with the same
pointer. This is the expected behavior for global processors shared
across multiple inputs within the same beat instance.

Changes:
- libbeat/processors/safe_processor.go: Store paths pointer and allow
  subsequent calls with the same pointer to succeed
- libbeat/processors/safe_processor_test.go: Add test cases for
  idempotent SetPaths behavior
- filebeat/tests/integration/cache_processor_test.go: Add integration
  test that verifies global cache processor works with multiple inputs
@orestisfl orestisfl self-assigned this Dec 2, 2025
@orestisfl orestisfl added the bug label Dec 2, 2025
@orestisfl orestisfl requested a review from a team as a code owner December 2, 2025 13:55
@orestisfl orestisfl added the backport-skip Skip notification from the automated backport with mergify label Dec 2, 2025
@orestisfl orestisfl requested a review from mauri870 December 2, 2025 13:55
@orestisfl orestisfl added the Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team label Dec 2, 2025
@orestisfl orestisfl requested a review from faec December 2, 2025 13:55
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Dec 2, 2025
@elasticmachine
Copy link
Contributor

Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane)

@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Dec 2, 2025
@github-actions
Copy link
Contributor

github-actions bot commented Dec 2, 2025

🤖 GitHub comments

Just comment with:

  • run docs-build : Re-trigger the docs validation. (use unformatted text in the comment!)

@orestisfl orestisfl requested a review from leehinman December 2, 2025 13:56
Copy link
Contributor

@leehinman leehinman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@orestisfl orestisfl merged commit fdd2b21 into elastic:main Dec 3, 2025
207 of 208 checks passed
@orestisfl orestisfl deleted the fix-setpaths-idempotent branch December 3, 2025 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-skip Skip notification from the automated backport with mergify bug bugfix skip-changelog Team:Elastic-Agent-Data-Plane Label for the Agent Data Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants