Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Enhanced HTTPJSON input error logging with structured error metadata conforming to Elastic Common Schema (ECS) conventions. {pull}45653[45653]19
- Add support for DPoP authentication for the CEL and HTTP JSON inputs. {pull}47441[47441]
- Improve logging of cache processor and add ignore failure option. {pull}47565[47565]

*Auditbeat*

Expand Down
58 changes: 46 additions & 12 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
if result != nil {
return result, nil
}
if p.config.IgnoreFailure {
p.log.Debugw("no match", "backend_id", p.store, "error", ErrNoMatch)
return event, nil
}
return event, ErrNoMatch

case p.config.Delete != nil:
Expand All @@ -184,14 +188,24 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {

// putFrom takes the configured value from the event and stores it in the cache
// if it exists.
func (p *cache) putFrom(event *beat.Event) error {
func (p *cache) putFrom(event *beat.Event) (err error) {
if p.config.IgnoreFailure {
defer func() {
if err == nil {
return
}
p.log.Debugw("ignoring put error", "backend_id", p.store, "error", err)
err = nil
}()
}

k, err := event.GetValue(p.config.Put.Key)
if err != nil {
return err
}
key, ok := k.(string)
if !ok {
return fmt.Errorf("key field '%s' not a string: %T", p.config.Put.Key, k)
return fmt.Errorf("key field '%s' used in %s not a string: %T", p.config.Put.Key, p.store, k)
}
p.log.Debugw("put", "backend_id", p.store, "key", key)

Expand All @@ -202,14 +216,24 @@ func (p *cache) putFrom(event *beat.Event) error {

err = p.store.Put(key, val)
if err != nil {
return fmt.Errorf("failed to put '%s' into '%s': %w", key, p.config.Put.Value, err)
return fmt.Errorf("failed to put '%s' into '%s' in %s: %w", key, p.config.Put.Value, p.store, err)
}
return nil
}

// getFor gets the configured value from the cache for the event and inserts
// it into the configured field if it exists.
func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) {
if p.config.IgnoreFailure {
defer func() {
if err == nil {
return
}
p.log.Debugw("ignoring get error", "backend_id", p.store, "error", err)
err = nil
}()
}

// Check for clobbering.
dst := p.config.Get.Target
if !p.config.OverwriteKeys {
Expand All @@ -226,17 +250,17 @@ func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) {
}
k, ok := v.(string)
if !ok {
return nil, fmt.Errorf("key field '%s' not a string: %T", key, v)
return nil, fmt.Errorf("key field '%s' used in %s not a string: %T", key, p.store, v)
}
p.log.Debugw("get", "backend_id", p.store, "key", k)

// Get metadata...
meta, err := p.store.Get(k)
if err != nil {
return nil, fmt.Errorf("%w for '%s': %w", ErrNoData, k, err)
return nil, fmt.Errorf("%w for '%s' in %s: %w", ErrNoData, k, p.store, err)
}
if meta == nil {
return nil, fmt.Errorf("%w for '%s'", ErrNoData, k)
return nil, fmt.Errorf("%w for '%s' in %s", ErrNoData, k, p.store)
}
if m, ok := meta.(map[string]interface{}); ok {
meta = mapstr.M(m)
Expand All @@ -255,14 +279,24 @@ func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) {

// deleteFor deletes the configured value from the cache based on the value of
// the configured key.
func (p *cache) deleteFor(event *beat.Event) error {
func (p *cache) deleteFor(event *beat.Event) (err error) {
if p.config.IgnoreFailure {
defer func() {
if err == nil {
return
}
p.log.Debugw("ignoring delete error", "backend_id", p.store, "error", err)
err = nil
}()
}

v, err := event.GetValue(p.config.Delete.Key)
if err != nil {
return err
}
k, ok := v.(string)
if !ok {
return fmt.Errorf("key field '%s' not a string: %T", p.config.Delete.Key, v)
return fmt.Errorf("key field '%s' used in %s not a string: %T", p.config.Delete.Key, p.store, v)
}
return p.store.Delete(k)
}
Expand All @@ -276,11 +310,11 @@ func (p *cache) Close() error {
func (p *cache) String() string {
switch {
case p.config.Put != nil:
return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]",
name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys)
return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, ignore_failure=%t, overwrite_fields=%t]",
name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.IgnoreFailure, p.config.OverwriteKeys)
case p.config.Get != nil:
return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]",
name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys)
return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, ignore_failure=%t, overwrite_fields=%t]",
name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.IgnoreFailure, p.config.OverwriteKeys)
case p.config.Delete != nil:
return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key)
default:
Expand Down
81 changes: 81 additions & 0 deletions libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,87 @@
},
},
},
{
name: "get_missing_value",
configs: []testConfig{
{
when: func(e mapstr.M) bool {
return e["get"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"get": mapstr.M{
"key_field": "crowdstrike.aid",
"target_field": "crowdstrike.metadata_new",
},
},
},
},
wantInitErr: nil,
steps: []cacheTestStep{
{
event: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
},
},
want: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
},
},
wantCacheVal: map[string]*CacheEntry{},
wantErr: errors.New("metadata not found for 'one' in memory:aidmaster: metadata not found"),
},
},
},
{
name: "get_missing_value_ignore_error",
configs: []testConfig{
{
when: func(e mapstr.M) bool {
return e["get"] == true
},
cfg: mapstr.M{
"backend": mapstr.M{
"memory": mapstr.M{
"id": "aidmaster",
},
},
"get": mapstr.M{
"key_field": "crowdstrike.aid",
"target_field": "crowdstrike.metadata_new",
},
"ignore_failure": true,
},
},
},
wantInitErr: nil,
steps: []cacheTestStep{
{
event: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
},
},
want: mapstr.M{
"get": true,
"crowdstrike": mapstr.M{
"aid": "one",
},
},
wantCacheVal: map[string]*CacheEntry{},
wantErr: nil,
},
},
},
{
name: "put_and_get_value_reverse_config",
configs: []testConfig{
Expand Down Expand Up @@ -562,7 +643,7 @@
}

func TestCache(t *testing.T) {
logp.TestingSetup(logp.WithSelectors(name))

Check failure on line 646 in libbeat/processors/cache/cache_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

SA1019: logp.TestingSetup is deprecated: Prefer using localized loggers. Use logptest.NewTestingLogger. (staticcheck)
for _, test := range cacheTests {
t.Run(test.name, func(t *testing.T) {
var processors []beat.Processor
Expand Down
5 changes: 5 additions & 0 deletions libbeat/processors/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type config struct {
// IgnoreMissing: Ignore errors if event has no matching field.
IgnoreMissing bool `config:"ignore_missing"`

// IgnoreFailure: Ignore errors in put, get or delete operations
// including missing cache entries.
IgnoreFailure bool `config:"ignore_failure"`

// OverwriteKeys allow target_fields to overwrite existing fields.
OverwriteKeys bool `config:"overwrite_keys"`
}
Expand Down Expand Up @@ -83,6 +87,7 @@ type delConfig struct {
func defaultConfig() config {
return config{
IgnoreMissing: true,
IgnoreFailure: false,
OverwriteKeys: false,
}
}
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ One of `put`, `get` or `delete` must be provided.
of the fields in `match_keys` will be discarded and an error will be generated. By
default, this condition is ignored.

`ignore_failure`:: (Optional) When set to `true`, processor failures, including
absence of values in the cache, will be ignored and logged at debug level.

`overwrite_keys`:: (Optional) By default, if a target field already exists, it
will not be overwritten and an error will be logged. If `overwrite_keys` is
set to `true`, this condition will be ignored.
Expand Down
Loading