-
Notifications
You must be signed in to change notification settings - Fork 5k
[cache-processor] Set beat paths #47353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
🤖 GitHub commentsExpand to view the GitHub comments
Just comment with:
|
|
One other idea I had was to stop registering the processors in the This has the advantage of getting rid of calls to |
efd6
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes elastic#46985
When multiple inputs connect to the same pipeline, each input calls SetPaths on global processors. Previously, the second call would fail with 'attempt to set paths twice'.
This fix makes SetPaths idempotent by storing the paths pointer and returning nil (instead of an error) when called again with the same pointer. This is the expected behavior for global processors shared across multiple inputs within the same beat instance.
Changes:
- libbeat/processors/safe_processor.go: Store paths pointer and allow subsequent calls with the same pointer to succeed
- libbeat/processors/safe_processor_test.go: Add test cases for idempotent SetPaths behavior
- filebeat/tests/integration/cache_processor_test.go: Add integration test that verifies global cache processor works with multiple inputs
## How to test this PR locally
### Option 1: Run the Integration Test
```bash
cd filebeat
mage buildSystemTestBinary
go test -v -tags integration -run "TestGlobalCacheProcessorMultipleInputs" ./tests/integration/
```
**Expected result:** Test passes with all 30 events processed from 3 inputs.
**Without the fix:** Test fails with:
```
"error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice"
```
### Option 2: Manual Test with Config File
```yaml
filebeat.inputs:
- type: filestream
id: input-a
paths:
- /tmp/logs/a.log
prospector.scanner.fingerprint.enabled: false
- type: filestream
id: input-b
paths:
- /tmp/logs/b.log
prospector.scanner.fingerprint.enabled: false
processors:
- cache:
backend:
file:
id: test-cache
write_interval: 1s
capacity: 1000
put:
key_field: message
value_field: message
ttl: 1h
ignore_missing: true
output.console:
enabled: true
```
```bash
mkdir -p /tmp/logs
for i in $(seq 1 100); do echo "{\"message\":\"test $i\"}"; done > /tmp/logs/a.log
for i in $(seq 1 100); do echo "{\"message\":\"test $i\"}"; done > /tmp/logs/b.log
./filebeat -e -c config_cache_global.yml 2>&1 | grep -E "error|twice|initialized|harvester.*started"
```
**Expected result (with fix):**
- `"initialized cache processor"` appears once
- Both inputs start successfully
- Events are processed from both inputs
**Without the fix:**
```
"error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice"
```
Only `input-a` processes events; `input-b` fails to connect.
## Related issues
- #47353
## Logs
<details>
<summary>Logs</summary>
```json
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.120+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":836},"message":"Home path: [/home/orestis/src/beats/x-pack/filebeat] Config path: [/home/orestis/src/beats/x-pack/filebeat] Data path: [/home/orestis/src/beats/x-pack/filebeat/data] Logs path: [/home/orestis/src/beats/x-pack/filebeat/logs]","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.121+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).configure","file.name":"instance/beat.go","file.line":844},"message":"Beat ID: 05fc1486-d763-43fa-8b98-e38ae1398577","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.129+0100","log.logger":"processors.cache","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/processors/cache.New","file.name":"cache/cache.go","file.line":72},"message":"cache processor created","service.name":"filebeat","instance_id":1,"config":{"Get":null,"Put":{"Key":"host.name","Value":"host.metadata","TTL":86400000000000},"Delete":null,"Store":{"Memory":{"ID":"global-cache"},"File":null,"Capacity":10000,"Effort":0},"IgnoreMissing":true,"IgnoreFailure":false,"OverwriteKeys":false},"ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"seccomp","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/common/seccomp.loadFilter","file.name":"seccomp/seccomp.go","file.line":125},"message":"Syscall filter successfully installed","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).createBeater","file.name":"instance/beat.go","file.line":332},"message":"Setup Beat: filebeat; Version: 9.3.0 (FIPS-distribution: false)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1393},"message":"Beat info","service.name":"filebeat","system_info":{"beat":{"path":{"config":"/home/orestis/src/beats/x-pack/filebeat","data":"/home/orestis/src/beats/x-pack/filebeat/data","home":"/home/orestis/src/beats/x-pack/filebeat","logs":"/home/orestis/src/beats/x-pack/filebeat/logs"},"type":"filebeat","uuid":"05fc1486-d763-43fa-8b98-e38ae1398577"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.130+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1402},"message":"Build info","service.name":"filebeat","system_info":{"build":{"commit":"deec5c55e9b0306e2c3b10de6367b266b71e7899","libbeat":"9.3.0","time":"2025-12-02T13:51:01.000Z","version":"9.3.0"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1405},"message":"Go runtime info","service.name":"filebeat","system_info":{"go":{"os":"linux","arch":"amd64","max_procs":12,"version":"go1.25.4 X:nodwarf5"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1411},"message":"Host info","service.name":"filebeat","system_info":{"host":{"architecture":"x86_64","native_architecture":"x86_64","boot_time":"2025-12-02T06:11:29+01:00","containerized":false,"name":"laptop","ip":["127.0.0.1","192.168.178.44","172.18.0.1","172.17.0.1","::1","2001:a61:12f1:9901:591e:b1f2:b909:30fa","fd9a:56af:31d7:0:5b85:f561:af6a:7612","fe80::a7c6:8abd:9493:d9d8","fe80::243e:c7ff:fe14:7299"],"kernel_version":"6.12.59-1-lts","mac":["00:be:43:61:7d:c4","8c:f8:c5:c5:33:ac","9c:7b:ef:62:65:a3","1e:3a:da:25:d0:60","26:3e:c7:14:72:99"],"os":{"type":"linux","family":"arch","platform":"arch","name":"Arch Linux","version":"","major":0,"minor":0,"patch":0,"build":"rolling"},"timezone":"CET","timezone_offset_sec":3600,"id":"9fab1a953bcd47949880e26abc13a79d"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.131+0100","log.logger":"beat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).logSystemInfo","file.name":"instance/beat.go","file.line":1440},"message":"Process info","service.name":"filebeat","system_info":{"process":{"capabilities":{"inheritable":["wake_alarm"],"permitted":null,"effective":null,"bounding":["chown","dac_override","dac_read_search","fowner","fsetid","kill","setgid","setuid","setpcap","linux_immutable","net_bind_service","net_broadcast","net_admin","net_raw","ipc_lock","ipc_owner","sys_module","sys_rawio","sys_chroot","sys_ptrace","sys_pacct","sys_admin","sys_boot","sys_nice","sys_resource","sys_time","sys_tty_config","mknod","lease","audit_write","audit_control","setfcap","mac_override","mac_admin","syslog","wake_alarm","block_suspend","audit_read","perfmon","bpf","checkpoint_restore"],"ambient":null},"cwd":"/home/orestis/src/beats","exe":"/home/orestis/src/beats/x-pack/filebeat/filebeat","name":"filebeat","pid":812688,"ppid":809821,"seccomp":{"mode":"filter","no_new_privs":true},"start_time":"2025-12-02T14:52:38.740+0100"},"ecs.version":"1.6.0"}}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"publisher","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/publisher/pipeline.LoadWithSettings","file.name":"pipeline/module.go","file.line":105},"message":"Beat name: laptop","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"modules","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/fileset.newModuleRegistry","file.name":"fileset/modules.go","file.line":138},"message":"Enabled modules/filesets: ","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"warn","@timestamp":"2025-12-02T14:52:39.133+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).setupPipelineLoaderCallback","file.name":"beater/filebeat.go","file.line":174},"message":"Filebeat is unable to load the ingest pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the ingest pipelines or are using Logstash pipelines, you can ignore this warning.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.logger":"monitoring","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/report/log.(*reporter).snapshotLoop","file.name":"log/log.go","file.line":150},"message":"Starting metrics logging every 30s","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.133+0100","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/cmd/instance.(*Beat).launch","file.name":"instance/beat.go","file.line":542},"message":"filebeat start running.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"filebeat","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog.openStore","file.name":"memlog/store.go","file.line":134},"message":"Finished loading transaction log file for '/home/orestis/src/beats/x-pack/filebeat/data/registry/filebeat'. Active transaction id=4","service.name":"filebeat","store":"filebeat","ecs.version":"1.6.0"}
{"log.level":"warn","@timestamp":"2025-12-02T14:52:39.134+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Run","file.name":"beater/filebeat.go","file.line":406},"message":"Filebeat is unable to load the ingest pipelines for the configured modules because the Elasticsearch output is not configured/enabled. If you have already loaded the ingest pipelines or are using Logstash pipelines, you can ignore this warning.","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"registrar","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/registrar.(*Registrar).loadStates","file.name":"registrar/registrar.go","file.line":103},"message":"States Loaded from registrar: 0","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start","file.name":"beater/crawler.go","file.line":76},"message":"Loading Inputs: 2","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.134+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput","file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 5483632775956168786)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1","file.name":"compat/compat.go","file.line":141},"message":"Input 'filestream' starting","service.name":"filebeat","id":"input-a","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream.metric_registry","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/inputmon.NewMetricsRegistry","file.name":"inputmon/input.go","file.line":182},"message":"registering","service.name":"filebeat","id":"input-a","registry_id":"input-a","input_id":"input-a","input_type":"filestream","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).startInput","file.name":"beater/crawler.go","file.line":148},"message":"Starting input (ID: 1687432983113697962)","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Start","file.name":"beater/crawler.go","file.line":111},"message":"Loading and starting Inputs completed. Enabled inputs: 2","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/v2/compat.(*runner).Start.func1","file.name":"compat/compat.go","file.line":141},"message":"Input 'filestream' starting","service.name":"filebeat","id":"input-b","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream.metric_registry","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/monitoring/inputmon.NewMetricsRegistry","file.name":"inputmon/input.go","file.line":182},"message":"registering","service.name":"filebeat","id":"input-b","registry_id":"input-b","input_id":"input-b","input_type":"filestream","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"processors.cache","log.origin":{"function":"github.com/elastic/beats/v7/libbeat/processors/cache.(*cache).SetPaths","file.name":"cache/cache.go","file.line":197},"message":"initialized cache processor","service.name":"filebeat","instance_id":1,"details":"cache=[operation=put, store_id=memory:global-cache, key_field=host.name, value_field=host.metadata, ttl=24h0m0s, ignore_missing=true, ignore_failure=false, overwrite_fields=false]","ecs.version":"1.6.0"}
{"log.level":"error","@timestamp":"2025-12-02T14:52:39.135+0100","log.logger":"input.filestream","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/input/filestream/internal/task.NewGroup.func2","file.name":"task/group.go","file.line":69},"message":"harvester:: error while connecting to output with pipeline: failed setting paths for global processors: attempt to set paths twice","service.name":"filebeat","id":"input-b","filestream_id":"input-b","ecs.version":"1.6.0"}
^C{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"service","log.origin":{"function":"github.com/elastic/elastic-agent-libs/service.HandleSignals.func1","file.name":"service/service.go","file.line":52},"message":"Received signal \"interrupt\", stopping","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*Filebeat).Stop","file.name":"beater/filebeat.go","file.line":541},"message":"Stopping filebeat","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop","file.name":"beater/crawler.go","file.line":155},"message":"Stopping Crawler","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop","file.name":"beater/crawler.go","file.line":165},"message":"Stopping 2 inputs","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"info","@timestamp":"2025-12-02T14:52:41.409+0100","log.logger":"crawler","log.origin":{"function":"github.com/elastic/beats/v7/filebeat/beater.(*crawler).Stop.func2","file.name":"beater/crawler.go","file.line":170},"message":"Stopping input: 1687432983113697962","service.name":"filebeat","ecs.version":"1.6.0"}
```
</details>
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes #46985
(cherry picked from commit 28222c4)
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes #46985
(cherry picked from commit 28222c4)
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes #46985
(cherry picked from commit 28222c4)
Co-authored-by: Orestis Floros <[email protected]>
Lazy Initialization of the Cache Processor's File Store
The Problem
The basic problem is that processors often use
paths.Resolveto find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.But when a Beat is embedded as a receiver (e.g.,
fbreceiverin the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.The
cacheprocessor currently tries to set up its file-based store in itsNewfunction, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in
cache.New, I've added aSetPaths(*paths.Path)method to the processor. This method creates the file store and is wrapped in async.Onceto make sure it only runs once. The processor's internal store object staysniluntilSetPathsis called during pipeline construction.How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
x-pack/filebeat/fbreceiver:createReceiverinstantiates the processors (includingcachewith anilstore) and callsinstance.NewBeatForReceiver.x-pack/libbeat/cmd/instance:NewBeatForReceivercreates thepaths.Pathobject from the receiver's specific configuration.libbeat/publisher/pipeline: Thispaths.Pathobject is passed into the pipeline. When a client connects, the path is added to thebeat.ProcessingConfig.libbeat/publisher/processing: The processing builder gets this config and callsgroup.SetPaths, which passes the path down to each processor.libbeat/processors/cache:SetPathsis finally called on the cache processor instance, and thesync.Onceguard ensures the file store is created with the correct path.Diagram
graph TD subgraph "libbeat/processors/cache (init)" A["init()"] end subgraph "libbeat/processors" B["processors.RegisterPlugin"] C{"registry"} end A --> B; B -- "Save factory" --> C; subgraph "x-pack/filebeat/fbreceiver" D["createReceiver"] end subgraph "libbeat/processors" E["processors.New(config)"] C -. "Lookup 'cache'" .-> E; end D --> E; D --> I; E --> G; subgraph "libbeat/processors/cache" G["cache.New()"] -- store=nil --> H{"cache"}; end subgraph "x-pack/libbeat/cmd/instance" I["instance.NewBeatForReceiver"]; I --> J{"paths.Path object"}; end subgraph "libbeat/publisher/pipeline" J --> K["pipeline.New"]; K --> L["ConnectWith"]; end subgraph "libbeat/publisher/processing" L -- "Config w/ paths" --> N["builder.Create"]; N --> O["group.SetPaths"]; end subgraph "libbeat/processors/cache" O --> P["cache.SetPaths"]; P --> Q["sync.Once"]; Q -- "initialize store" --> H; endPros and Cons of This Approach
libbeat.setPathsinterface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.Alternatives Considered
Option 1: Add a
pathsargument to all processor constructorspathsargument is not needed in many processors, so adding a rarely used option to the function signature is verbose.Option 2: Refactor
processorsto introduce a "V2" interfaceChecklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesstresstest.shscript to run them under stress conditions and race detector to verify their stability.I have added an entry in./changelog/fragmentsusing the changelog tool.How to test this PR locally
Configuration
filebeat-cache-mwe.yml:Setup
Expected Output
Target events should have
cached_metadatafield populated:{ "event": { "type": "target", "id": "001" }, "message": "target event 1", "cached_metadata": { "user": "user-1", "role": "admin", "sequence": 1, "data": { "ip": "192.168.1.1", "session": "session-001" } } }Cache Files
After running filebeat, check cache files:
example:
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"} {"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"} {"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"} {"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"} {"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}Related issues