-
Notifications
You must be signed in to change notification settings - Fork 5k
[8.19](backport #47353) [cache-processor] Set beat paths #47884
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
mergify
wants to merge
2
commits into
8.19
Choose a base branch
from
mergify/bp/8.19/pr-47353
base: 8.19
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+548
−75
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Lazy Initialization of the Cache Processor's File Store
## The Problem
The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.
But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.
The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.
## The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction.
## How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`.
2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration.
3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`.
4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor.
5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path.
## Diagram
```mermaid
graph TD
subgraph "libbeat/processors/cache (init)"
A["init()"]
end
subgraph "libbeat/processors"
B["processors.RegisterPlugin"]
C{"registry"}
end
A --> B;
B -- "Save factory" --> C;
subgraph "x-pack/filebeat/fbreceiver"
D["createReceiver"]
end
subgraph "libbeat/processors"
E["processors.New(config)"]
C -. "Lookup 'cache'" .-> E;
end
D --> E;
D --> I;
E --> G;
subgraph "libbeat/processors/cache"
G["cache.New()"] -- store=nil --> H{"cache"};
end
subgraph "x-pack/libbeat/cmd/instance"
I["instance.NewBeatForReceiver"];
I --> J{"paths.Path object"};
end
subgraph "libbeat/publisher/pipeline"
J --> K["pipeline.New"];
K --> L["ConnectWith"];
end
subgraph "libbeat/publisher/processing"
L -- "Config w/ paths" --> N["builder.Create"];
N --> O["group.SetPaths"];
end
subgraph "libbeat/processors/cache"
O --> P["cache.SetPaths"];
P --> Q["sync.Once"];
Q -- "initialize store" --> H;
end
```
## Pros and Cons of This Approach
* **Pros**:
* It's a minimal, targeted change that solves the immediate problem.
* It avoids a large-scale, breaking refactoring of all processors.
* It maintains backward compatibility for existing processors and downstream consumers of `libbeat`.
* **Cons**:
* Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.
## Alternatives Considered
### Option 1: Add a `paths` argument to all processor constructors
* **Pros**:
* Simple and direct.
* **Cons**:
* Requires a global refactoring of all processors.
* Breaks external downstream libbeat importers like Cloudbeat.
* The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose.
### Option 2: Refactor `processors` to introduce a "V2" interface
* **Pros**:
* Allows for a new, backwards-compatible signature (e.g., using a config struct).
* This can still be done later.
* We can support both V1 processors and gradually move processors to V2.
* **Cons**:
* Needs a significant refactoring effort.
## Checklist
<!-- Mandatory
Add a checklist of things that are required to be reviewed in order to have the PR approved
List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~)
-->
- [x] My code follows the style guidelines of this project
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] ~~I have made corresponding changes to the documentation~~
- [ ] ~~I have made corresponding change to the default configuration files~~
- [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability.
- [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~
## How to test this PR locally
### Configuration
`filebeat-cache-mwe.yml`:
```yaml
path.data: /tmp/data
filebeat.inputs:
- type: filestream
id: filestream-input
enabled: true
paths:
- /tmp/logs/*.log
parsers:
- ndjson:
target: ""
processors:
# PUT: Store metadata when event.type is "source"
- if:
equals:
event.type: "source"
then:
- cache:
backend:
file:
id: test_cache
write_interval: 5s
put:
key_field: event.id
value_field: event.metadata
ttl: 1h
# GET: Retrieve metadata when event.type is "target"
- if:
equals:
event.type: "target"
then:
- cache:
backend:
file:
id: test_cache
get:
key_field: event.id
target_field: cached_metadata
output.console:
enabled: true
```
### Setup
```bash
# Create directory
#rm -rf /tmp/data /tmp/logs
mkdir -p /tmp/logs
# Create test data
cat > /tmp/logs/test.log <<'EOF'
{"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"}
{"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"}
{"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"}
{"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"}
{"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"}
{"event":{"type":"target","id":"001"},"message":"target event 1"}
{"event":{"type":"target","id":"002"},"message":"target event 2"}
{"event":{"type":"target","id":"003"},"message":"target event 3"}
{"event":{"type":"target","id":"004"},"message":"target event 4"}
{"event":{"type":"target","id":"005"},"message":"target event 5"}
EOF
# Run filebeat
./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml
```
### Expected Output
Target events should have `cached_metadata` field populated:
```json
{
"event": {
"type": "target",
"id": "001"
},
"message": "target event 1",
"cached_metadata": {
"user": "user-1",
"role": "admin",
"sequence": 1,
"data": {
"ip": "192.168.1.1",
"session": "session-001"
}
}
}
```
### Cache Files
After running filebeat, check cache files:
```bash
cat /tmp/data/cache_processor/test_cache
```
example:
```json
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"}
{"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"}
{"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"}
{"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"}
{"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}
```
## Related issues
- Closes #46985
(cherry picked from commit 28222c4)
Contributor
|
Pinging @elastic/elastic-agent-data-plane (Team:Elastic-Agent-Data-Plane) |
orestisfl
approved these changes
Dec 3, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
backport
enhancement
skip-changelog
Team:Elastic-Agent-Data-Plane
Label for the Agent Data Plane team
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Lazy Initialization of the Cache Processor's File Store
The Problem
The basic problem is that processors often use
paths.Resolveto find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process.But when a Beat is embedded as a receiver (e.g.,
fbreceiverin the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this.The
cacheprocessor currently tries to set up its file-based store in itsNewfunction, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later.The Solution
My solution is to initialize the cache's file store lazily.
Instead of creating the store in
cache.New, I've added aSetPaths(*paths.Path)method to the processor. This method creates the file store and is wrapped in async.Onceto make sure it only runs once. The processor's internal store object staysniluntilSetPathsis called during pipeline construction.How it Works
The path info gets passed down when a client connects to the pipeline. Here's the flow:
x-pack/filebeat/fbreceiver:createReceiverinstantiates the processors (includingcachewith anilstore) and callsinstance.NewBeatForReceiver.x-pack/libbeat/cmd/instance:NewBeatForReceivercreates thepaths.Pathobject from the receiver's specific configuration.libbeat/publisher/pipeline: Thispaths.Pathobject is passed into the pipeline. When a client connects, the path is added to thebeat.ProcessingConfig.libbeat/publisher/processing: The processing builder gets this config and callsgroup.SetPaths, which passes the path down to each processor.libbeat/processors/cache:SetPathsis finally called on the cache processor instance, and thesync.Onceguard ensures the file store is created with the correct path.Diagram
graph TD subgraph "libbeat/processors/cache (init)" A["init()"] end subgraph "libbeat/processors" B["processors.RegisterPlugin"] C{"registry"} end A --> B; B -- "Save factory" --> C; subgraph "x-pack/filebeat/fbreceiver" D["createReceiver"] end subgraph "libbeat/processors" E["processors.New(config)"] C -. "Lookup 'cache'" .-> E; end D --> E; D --> I; E --> G; subgraph "libbeat/processors/cache" G["cache.New()"] -- store=nil --> H{"cache"}; end subgraph "x-pack/libbeat/cmd/instance" I["instance.NewBeatForReceiver"]; I --> J{"paths.Path object"}; end subgraph "libbeat/publisher/pipeline" J --> K["pipeline.New"]; K --> L["ConnectWith"]; end subgraph "libbeat/publisher/processing" L -- "Config w/ paths" --> N["builder.Create"]; N --> O["group.SetPaths"]; end subgraph "libbeat/processors/cache" O --> P["cache.SetPaths"]; P --> Q["sync.Once"]; Q -- "initialize store" --> H; endPros and Cons of This Approach
libbeat.setPathsinterface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it.Alternatives Considered
Option 1: Add a
pathsargument to all processor constructorspathsargument is not needed in many processors, so adding a rarely used option to the function signature is verbose.Option 2: Refactor
processorsto introduce a "V2" interfaceChecklist
I have made corresponding changes to the documentationI have made corresponding change to the default configuration filesstresstest.shscript to run them under stress conditions and race detector to verify their stability.I have added an entry in./changelog/fragmentsusing the changelog tool.How to test this PR locally
Configuration
filebeat-cache-mwe.yml:Setup
Expected Output
Target events should have
cached_metadatafield populated:{ "event": { "type": "target", "id": "001" }, "message": "target event 1", "cached_metadata": { "user": "user-1", "role": "admin", "sequence": 1, "data": { "ip": "192.168.1.1", "session": "session-001" } } }Cache Files
After running filebeat, check cache files:
example:
{"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"} {"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"} {"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"} {"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"} {"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"}Related issues
This is an automatic backport of pull request [cache-processor] Set beat paths #47353 done by Mergify.