Commit e585626
[cache-processor] Set beat paths (#47353)
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes #46985
(cherry picked from commit 28222c4)1 parent 5db5435 commit e585626
File tree
17 files changed
+548
-75
lines changed- libbeat
- cmd/instance
- processors
- cache
- publisher
- pipeline
- processing
- x-pack/libbeat/cmd/instance
17 files changed
+548
-75
lines changed| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
387 | 387 | | |
388 | 388 | | |
389 | 389 | | |
| 390 | + | |
390 | 391 | | |
391 | 392 | | |
392 | 393 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
69 | 69 | | |
70 | 70 | | |
71 | 71 | | |
| 72 | + | |
72 | 73 | | |
73 | | - | |
74 | | - | |
75 | | - | |
76 | | - | |
77 | | - | |
78 | | - | |
| 74 | + | |
79 | 75 | | |
80 | | - | |
81 | | - | |
| 76 | + | |
82 | 77 | | |
83 | | - | |
84 | | - | |
85 | | - | |
| 78 | + | |
86 | 79 | | |
87 | 80 | | |
88 | 81 | | |
89 | 82 | | |
90 | 83 | | |
91 | 84 | | |
92 | | - | |
| 85 | + | |
93 | 86 | | |
94 | 87 | | |
95 | 88 | | |
96 | 89 | | |
97 | 90 | | |
98 | 91 | | |
99 | | - | |
| 92 | + | |
100 | 93 | | |
101 | 94 | | |
102 | 95 | | |
103 | | - | |
| 96 | + | |
104 | 97 | | |
105 | 98 | | |
106 | 99 | | |
| |||
131 | 124 | | |
132 | 125 | | |
133 | 126 | | |
134 | | - | |
| 127 | + | |
135 | 128 | | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
136 | 133 | | |
137 | 134 | | |
138 | 135 | | |
| |||
182 | 179 | | |
183 | 180 | | |
184 | 181 | | |
| 182 | + | |
| 183 | + | |
| 184 | + | |
| 185 | + | |
| 186 | + | |
| 187 | + | |
| 188 | + | |
| 189 | + | |
| 190 | + | |
| 191 | + | |
| 192 | + | |
| 193 | + | |
| 194 | + | |
| 195 | + | |
| 196 | + | |
185 | 197 | | |
186 | 198 | | |
187 | 199 | | |
| |||
268 | 280 | | |
269 | 281 | | |
270 | 282 | | |
271 | | - | |
| 283 | + | |
| 284 | + | |
| 285 | + | |
272 | 286 | | |
273 | 287 | | |
274 | 288 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
| 22 | + | |
22 | 23 | | |
23 | 24 | | |
24 | 25 | | |
| |||
29 | 30 | | |
30 | 31 | | |
31 | 32 | | |
| 33 | + | |
32 | 34 | | |
33 | 35 | | |
34 | 36 | | |
| |||
583 | 585 | | |
584 | 586 | | |
585 | 587 | | |
586 | | - | |
| 588 | + | |
| 589 | + | |
| 590 | + | |
| 591 | + | |
| 592 | + | |
| 593 | + | |
| 594 | + | |
| 595 | + | |
| 596 | + | |
| 597 | + | |
| 598 | + | |
| 599 | + | |
| 600 | + | |
587 | 601 | | |
588 | 602 | | |
589 | 603 | | |
| |||
624 | 638 | | |
625 | 639 | | |
626 | 640 | | |
| 641 | + | |
| 642 | + | |
| 643 | + | |
| 644 | + | |
| 645 | + | |
| 646 | + | |
| 647 | + | |
| 648 | + | |
| 649 | + | |
| 650 | + | |
| 651 | + | |
| 652 | + | |
| 653 | + | |
| 654 | + | |
| 655 | + | |
| 656 | + | |
| 657 | + | |
| 658 | + | |
| 659 | + | |
| 660 | + | |
| 661 | + | |
| 662 | + | |
| 663 | + | |
| 664 | + | |
| 665 | + | |
| 666 | + | |
| 667 | + | |
| 668 | + | |
| 669 | + | |
| 670 | + | |
| 671 | + | |
| 672 | + | |
| 673 | + | |
| 674 | + | |
| 675 | + | |
| 676 | + | |
| 677 | + | |
| 678 | + | |
| 679 | + | |
| 680 | + | |
| 681 | + | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
47 | 47 | | |
48 | 48 | | |
49 | 49 | | |
50 | | - | |
| 50 | + | |
51 | 51 | | |
52 | 52 | | |
53 | 53 | | |
54 | 54 | | |
55 | | - | |
| 55 | + | |
56 | 56 | | |
57 | 57 | | |
58 | 58 | | |
| |||
63 | 63 | | |
64 | 64 | | |
65 | 65 | | |
66 | | - | |
67 | | - | |
68 | | - | |
69 | | - | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
70 | 70 | | |
71 | 71 | | |
72 | 72 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
| 29 | + | |
29 | 30 | | |
30 | 31 | | |
31 | 32 | | |
| |||
77 | 78 | | |
78 | 79 | | |
79 | 80 | | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
80 | 89 | | |
81 | 90 | | |
82 | 91 | | |
| |||
86 | 95 | | |
87 | 96 | | |
88 | 97 | | |
89 | | - | |
| 98 | + | |
90 | 99 | | |
91 | 100 | | |
92 | 101 | | |
| |||
199 | 208 | | |
200 | 209 | | |
201 | 210 | | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| 227 | + | |
| 228 | + | |
| 229 | + | |
| 230 | + | |
| 231 | + | |
202 | 232 | | |
203 | 233 | | |
204 | 234 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
| 29 | + | |
| 30 | + | |
29 | 31 | | |
30 | 32 | | |
31 | 33 | | |
| |||
248 | 250 | | |
249 | 251 | | |
250 | 252 | | |
| 253 | + | |
| 254 | + | |
| 255 | + | |
| 256 | + | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
| 263 | + | |
| 264 | + | |
251 | 265 | | |
252 | 266 | | |
253 | 267 | | |
| |||
289 | 303 | | |
290 | 304 | | |
291 | 305 | | |
| 306 | + | |
292 | 307 | | |
293 | 308 | | |
294 | 309 | | |
295 | 310 | | |
296 | | - | |
297 | 311 | | |
298 | 312 | | |
299 | 313 | | |
300 | 314 | | |
301 | 315 | | |
| 316 | + | |
| 317 | + | |
| 318 | + | |
| 319 | + | |
| 320 | + | |
| 321 | + | |
| 322 | + | |
| 323 | + | |
| 324 | + | |
| 325 | + | |
| 326 | + | |
| 327 | + | |
| 328 | + | |
| 329 | + | |
| 330 | + | |
| 331 | + | |
| 332 | + | |
| 333 | + | |
| 334 | + | |
| 335 | + | |
| 336 | + | |
| 337 | + | |
| 338 | + | |
| 339 | + | |
| 340 | + | |
| 341 | + | |
| 342 | + | |
| 343 | + | |
| 344 | + | |
| 345 | + | |
| 346 | + | |
| 347 | + | |
| 348 | + | |
| 349 | + | |
| 350 | + | |
| 351 | + | |
| 352 | + | |
| 353 | + | |
| 354 | + | |
| 355 | + | |
| 356 | + | |
| 357 | + | |
| 358 | + | |
| 359 | + | |
| 360 | + | |
| 361 | + | |
| 362 | + | |
| 363 | + | |
| 364 | + | |
| 365 | + | |
| 366 | + | |
| 367 | + | |
| 368 | + | |
| 369 | + | |
| 370 | + | |
| 371 | + | |
| 372 | + | |
302 | 373 | | |
303 | 374 | | |
304 | 375 | | |
| |||
310 | 381 | | |
311 | 382 | | |
312 | 383 | | |
| 384 | + | |
| 385 | + | |
| 386 | + | |
| 387 | + | |
| 388 | + | |
| 389 | + | |
| 390 | + | |
| 391 | + | |
| 392 | + | |
| 393 | + | |
| 394 | + | |
0 commit comments