Skip to content

Commit 28222c4

Browse files
authored
[cache-processor] Set beat paths (#47353)
# Lazy Initialization of the Cache Processor's File Store ## The Problem The basic problem is that processors often use `paths.Resolve` to find directories like "data" or "logs". This function uses a global variable for the base path, which is fine when a Beat runs as a standalone process. But when a Beat is embedded as a receiver (e.g., `fbreceiver` in the OTel Collector), this global causes problems. Each receiver needs its own isolated state directory, and a single global path prevents this. The `cache` processor currently tries to set up its file-based store in its `New` function, which is too early. It only has access to the global path, not the receiver-specific path that gets configured later. ## The Solution My solution is to initialize the cache's file store lazily. Instead of creating the store in `cache.New`, I've added a `SetPaths(*paths.Path)` method to the processor. This method creates the file store and is wrapped in a `sync.Once` to make sure it only runs once. The processor's internal store object stays `nil` until `SetPaths` is called during pipeline construction. ## How it Works The path info gets passed down when a client connects to the pipeline. Here's the flow: 1. **`x-pack/filebeat/fbreceiver`**: `createReceiver` instantiates the processors (including `cache` with a `nil` store) and calls `instance.NewBeatForReceiver`. 2. **`x-pack/libbeat/cmd/instance`**: `NewBeatForReceiver` creates the `paths.Path` object from the receiver's specific configuration. 3. **`libbeat/publisher/pipeline`**: This `paths.Path` object is passed into the pipeline. When a client connects, the path is added to the `beat.ProcessingConfig`. 4. **`libbeat/publisher/processing`**: The processing builder gets this config and calls `group.SetPaths`, which passes the path down to each processor. 5. **`libbeat/processors/cache`**: `SetPaths` is finally called on the cache processor instance, and the `sync.Once` guard ensures the file store is created with the correct path. ## Diagram ```mermaid graph TD subgraph "libbeat/processors/cache (init)" A["init()"] end subgraph "libbeat/processors" B["processors.RegisterPlugin"] C{"registry"} end A --> B; B -- "Save factory" --> C; subgraph "x-pack/filebeat/fbreceiver" D["createReceiver"] end subgraph "libbeat/processors" E["processors.New(config)"] C -. "Lookup 'cache'" .-> E; end D --> E; D --> I; E --> G; subgraph "libbeat/processors/cache" G["cache.New()"] -- store=nil --> H{"cache"}; end subgraph "x-pack/libbeat/cmd/instance" I["instance.NewBeatForReceiver"]; I --> J{"paths.Path object"}; end subgraph "libbeat/publisher/pipeline" J --> K["pipeline.New"]; K --> L["ConnectWith"]; end subgraph "libbeat/publisher/processing" L -- "Config w/ paths" --> N["builder.Create"]; N --> O["group.SetPaths"]; end subgraph "libbeat/processors/cache" O --> P["cache.SetPaths"]; P --> Q["sync.Once"]; Q -- "initialize store" --> H; end ``` ## Pros and Cons of This Approach * **Pros**: * It's a minimal, targeted change that solves the immediate problem. * It avoids a large-scale, breaking refactoring of all processors. * It maintains backward compatibility for existing processors and downstream consumers of `libbeat`. * **Cons**: * Using a type assertion for the `setPaths` interface feels a bit like magic, since the behavior changes at runtime depending on whether a processor implements it. ## Alternatives Considered ### Option 1: Add a `paths` argument to all processor constructors * **Pros**: * Simple and direct. * **Cons**: * Requires a global refactoring of all processors. * Breaks external downstream libbeat importers like Cloudbeat. * The `paths` argument is not needed in many processors, so adding a rarely used option to the function signature is verbose. ### Option 2: Refactor `processors` to introduce a "V2" interface * **Pros**: * Allows for a new, backwards-compatible signature (e.g., using a config struct). * This can still be done later. * We can support both V1 processors and gradually move processors to V2. * **Cons**: * Needs a significant refactoring effort. ## Checklist <!-- Mandatory Add a checklist of things that are required to be reviewed in order to have the PR approved List here all the items you have verified BEFORE sending this PR. Please DO NOT remove any item, striking through those that do not apply. (Just in case, strikethrough uses two tildes. ~~Scratch this.~~) --> - [x] My code follows the style guidelines of this project - [x] I have commented my code, particularly in hard-to-understand areas - [ ] ~~I have made corresponding changes to the documentation~~ - [ ] ~~I have made corresponding change to the default configuration files~~ - [x] I have added tests that prove my fix is effective or that my feature works. Where relevant, I have used the [`stresstest.sh`](https://github.com/elastic/beats/blob/main/script/stresstest.sh) script to run them under stress conditions and race detector to verify their stability. - [ ] ~~I have added an entry in `./changelog/fragments` using the [changelog tool](https://github.com/elastic/elastic-agent-changelog-tool/blob/main/docs/usage.md).~~ ## How to test this PR locally ### Configuration `filebeat-cache-mwe.yml`: ```yaml path.data: /tmp/data filebeat.inputs: - type: filestream id: filestream-input enabled: true paths: - /tmp/logs/*.log parsers: - ndjson: target: "" processors: # PUT: Store metadata when event.type is "source" - if: equals: event.type: "source" then: - cache: backend: file: id: test_cache write_interval: 5s put: key_field: event.id value_field: event.metadata ttl: 1h # GET: Retrieve metadata when event.type is "target" - if: equals: event.type: "target" then: - cache: backend: file: id: test_cache get: key_field: event.id target_field: cached_metadata output.console: enabled: true ``` ### Setup ```bash # Create directory #rm -rf /tmp/data /tmp/logs mkdir -p /tmp/logs # Create test data cat > /tmp/logs/test.log <<'EOF' {"event":{"type":"source","id":"001","metadata":{"user":"user-1","role":"admin","sequence":1,"data":{"ip":"192.168.1.1","session":"session-001"}}},"message":"source event 1"} {"event":{"type":"source","id":"002","metadata":{"user":"user-2","role":"admin","sequence":2,"data":{"ip":"192.168.1.2","session":"session-002"}}},"message":"source event 2"} {"event":{"type":"source","id":"003","metadata":{"user":"user-3","role":"admin","sequence":3,"data":{"ip":"192.168.1.3","session":"session-003"}}},"message":"source event 3"} {"event":{"type":"source","id":"004","metadata":{"user":"user-4","role":"admin","sequence":4,"data":{"ip":"192.168.1.4","session":"session-004"}}},"message":"source event 4"} {"event":{"type":"source","id":"005","metadata":{"user":"user-5","role":"admin","sequence":5,"data":{"ip":"192.168.1.5","session":"session-005"}}},"message":"source event 5"} {"event":{"type":"target","id":"001"},"message":"target event 1"} {"event":{"type":"target","id":"002"},"message":"target event 2"} {"event":{"type":"target","id":"003"},"message":"target event 3"} {"event":{"type":"target","id":"004"},"message":"target event 4"} {"event":{"type":"target","id":"005"},"message":"target event 5"} EOF # Run filebeat ./x-pack/filebeat/filebeat -e -c filebeat-cache-mwe.yml ``` ### Expected Output Target events should have `cached_metadata` field populated: ```json { "event": { "type": "target", "id": "001" }, "message": "target event 1", "cached_metadata": { "user": "user-1", "role": "admin", "sequence": 1, "data": { "ip": "192.168.1.1", "session": "session-001" } } } ``` ### Cache Files After running filebeat, check cache files: ```bash cat /tmp/data/cache_processor/test_cache ``` example: ```json {"key":"001","val":{"data":{"ip":"192.168.1.1","session":"session-001"},"role":"admin","sequence":1,"user":"user-1"},"expires":"2025-11-20T15:02:32.865896537+01:00"} {"key":"002","val":{"data":{"ip":"192.168.1.2","session":"session-002"},"role":"admin","sequence":2,"user":"user-2"},"expires":"2025-11-20T15:02:32.865950973+01:00"} {"key":"003","val":{"data":{"ip":"192.168.1.3","session":"session-003"},"role":"admin","sequence":3,"user":"user-3"},"expires":"2025-11-20T15:02:32.865972408+01:00"} {"key":"004","val":{"data":{"ip":"192.168.1.4","session":"session-004"},"role":"admin","sequence":4,"user":"user-4"},"expires":"2025-11-20T15:02:32.865988843+01:00"} {"key":"005","val":{"data":{"ip":"192.168.1.5","session":"session-005"},"role":"admin","sequence":5,"user":"user-5"},"expires":"2025-11-20T15:02:32.866006958+01:00"} ``` ## Related issues - Closes #46985
1 parent 2124e33 commit 28222c4

File tree

17 files changed

+548
-75
lines changed

17 files changed

+548
-75
lines changed

libbeat/cmd/instance/beat.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
389389
WaitClose: time.Second,
390390
Processors: b.processors,
391391
InputQueueSize: b.InputQueueSize,
392+
Paths: b.Paths,
392393
}
393394
publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
394395
if err != nil {

libbeat/processors/cache/cache.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -69,38 +69,31 @@ func New(cfg *conf.C, log *logp.Logger) (beat.Processor, error) {
6969
// Logging (each processor instance has a unique ID).
7070
id := int(instanceID.Add(1))
7171
log = log.Named(name).With("instance_id", id)
72+
log.Infow("cache processor created", "config", config)
7273

73-
src, cancel, err := getStoreFor(config, log)
74-
if err != nil {
75-
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
76-
}
77-
78-
p := &cache{
74+
return &cache{
7975
config: config,
80-
store: src,
81-
cancel: cancel,
76+
store: nil, // initialized in SetPaths
8277
log: log,
83-
}
84-
p.log.Infow("initialized cache processor", "details", p)
85-
return p, nil
78+
}, nil
8679
}
8780

8881
// getStoreFor returns a backing store for the provided configuration,
8982
// and a context cancellation that releases the cache resource when it
9083
// is no longer required. The cancellation should be called when the
9184
// processor is closed.
92-
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
85+
func getStoreFor(cfg config, log *logp.Logger, path *paths.Path) (Store, context.CancelFunc, error) {
9386
switch {
9487
case cfg.Store.Memory != nil:
9588
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
9689
return s, cancel, nil
9790

9891
case cfg.Store.File != nil:
99-
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
92+
err := os.MkdirAll(path.Resolve(paths.Data, "cache_processor"), 0o700)
10093
if err != nil {
10194
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
10295
}
103-
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
96+
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log, path)
10497
return s, cancel, nil
10598

10699
default:
@@ -131,8 +124,12 @@ type CacheEntry struct {
131124
index int
132125
}
133126

134-
// Run enriches the given event with the host metadata.
127+
// Run enriches the given event with cached metadata.
135128
func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
129+
if p.store == nil {
130+
return event, fmt.Errorf("cache processor store not initialized")
131+
}
132+
136133
switch {
137134
case p.config.Put != nil:
138135
p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put)
@@ -186,6 +183,21 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) {
186183
}
187184
}
188185

186+
// SetPaths initializes the cache store with the provided paths configuration.
187+
// This method must be called before the processor can be used.
188+
func (p *cache) SetPaths(path *paths.Path) error {
189+
src, cancel, err := getStoreFor(p.config, p.log, path)
190+
if err != nil {
191+
return fmt.Errorf("cache processor could not create store for %s: %w", name, err)
192+
}
193+
194+
p.store = src
195+
p.cancel = cancel
196+
197+
p.log.Infow("initialized cache processor", "details", p)
198+
return nil
199+
}
200+
189201
// putFrom takes the configured value from the event and stores it in the cache
190202
// if it exists.
191203
func (p *cache) putFrom(event *beat.Event) (err error) {
@@ -302,7 +314,9 @@ func (p *cache) deleteFor(event *beat.Event) (err error) {
302314
}
303315

304316
func (p *cache) Close() error {
305-
p.cancel()
317+
if p.cancel != nil {
318+
p.cancel()
319+
}
306320
return nil
307321
}
308322

libbeat/processors/cache/cache_test.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package cache
1919

2020
import (
2121
"errors"
22+
"strings"
2223
"testing"
2324

2425
"github.com/google/go-cmp/cmp"
@@ -28,6 +29,7 @@ import (
2829
conf "github.com/elastic/elastic-agent-libs/config"
2930
"github.com/elastic/elastic-agent-libs/logp/logptest"
3031
"github.com/elastic/elastic-agent-libs/mapstr"
32+
"github.com/elastic/elastic-agent-libs/paths"
3133
)
3234

3335
type cacheTestStep struct {
@@ -662,7 +664,19 @@ func TestCache(t *testing.T) {
662664
t.Log(p)
663665
c, ok := p.(*cache)
664666
if !ok {
665-
t.Fatalf("processor %d is not an *cache", i)
667+
t.Fatalf("processor %d is not a *cache", i)
668+
}
669+
670+
// Initialize the store with paths
671+
tmpDir := t.TempDir()
672+
err = c.SetPaths(&paths.Path{
673+
Home: tmpDir,
674+
Config: tmpDir,
675+
Data: tmpDir,
676+
Logs: tmpDir,
677+
})
678+
if err != nil {
679+
t.Errorf("unexpected error from SetPaths: %v", err)
666680
}
667681

668682
defer func() {
@@ -702,3 +716,44 @@ func TestCache(t *testing.T) {
702716
})
703717
}
704718
}
719+
720+
func TestSetPathsUninitialized(t *testing.T) {
721+
cfg, err := conf.NewConfigFrom(mapstr.M{
722+
"backend": mapstr.M{
723+
"memory": mapstr.M{
724+
"id": "test",
725+
},
726+
},
727+
"get": mapstr.M{
728+
"key_field": "key",
729+
"target_field": "target",
730+
},
731+
})
732+
if err != nil {
733+
t.Fatalf("unexpected error from NewConfigFrom: %v", err)
734+
}
735+
736+
p, err := New(cfg, logptest.NewTestingLogger(t, ""))
737+
if err != nil {
738+
t.Fatalf("unexpected error from New: %v", err)
739+
}
740+
741+
c, ok := p.(*cache)
742+
if !ok {
743+
t.Fatal("processor is not a *cache")
744+
}
745+
defer func() {
746+
if err := c.Close(); err != nil {
747+
t.Errorf("unexpected error from c.Close(): %v", err)
748+
}
749+
}()
750+
751+
// Try to use without SetPaths - should fail
752+
event, err := c.Run(&beat.Event{})
753+
if event == nil {
754+
t.Error("expected non-nil event")
755+
}
756+
if err == nil || !strings.Contains(err.Error(), "cache processor store not initialized") {
757+
t.Fatalf("expected error containing 'cache processor store not initialized', got: %v", err)
758+
}
759+
}

libbeat/processors/cache/file_store.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ type fileStoreSet struct {
4747
// and its reference count is increased. The returned context.CancelFunc
4848
// reduces the reference count and deletes the fileStore from the set if the
4949
// count reaches zero.
50-
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) {
50+
func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger, path *paths.Path) (*fileStore, context.CancelFunc) {
5151
s.mu.Lock()
5252
defer s.mu.Unlock()
5353
store, ok := s.stores[id]
5454
if !ok {
55-
store = newFileStore(cfg, id, pathFromConfig(cfg, log), log)
55+
store = newFileStore(cfg, id, pathFromConfig(cfg, log, path), log)
5656
s.stores[store.id] = store
5757
}
5858
store.add(cfg)
@@ -63,10 +63,10 @@ func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore,
6363
}
6464

6565
// pathFromConfig returns the mapping form a config to a file-system path.
66-
func pathFromConfig(cfg config, log *logp.Logger) string {
67-
path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
68-
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path)
69-
return path
66+
func pathFromConfig(cfg config, log *logp.Logger, path *paths.Path) string {
67+
resultPath := filepath.Join(path.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID))
68+
log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", resultPath)
69+
return resultPath
7070
}
7171

7272
// cleanFilename replaces illegal printable characters (and space or dot) in

libbeat/processors/conditionals.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/elastic/beats/v7/libbeat/conditions"
2727
"github.com/elastic/elastic-agent-libs/config"
2828
"github.com/elastic/elastic-agent-libs/logp"
29+
"github.com/elastic/elastic-agent-libs/paths"
2930
)
3031

3132
// NewConditional returns a constructor suitable for registering when conditionals as a plugin.
@@ -77,6 +78,14 @@ func (r *WhenProcessor) Run(event *beat.Event) (*beat.Event, error) {
7778
return r.p.Run(event)
7879
}
7980

81+
func (r *WhenProcessor) SetPaths(paths *paths.Path) error {
82+
pathSetter, ok := r.p.(PathSetter)
83+
if ok {
84+
return pathSetter.SetPaths(paths)
85+
}
86+
return nil
87+
}
88+
8089
func (r *WhenProcessor) String() string {
8190
return fmt.Sprintf("%v, condition=%v", r.p.String(), r.condition.String())
8291
}
@@ -86,7 +95,7 @@ func (r *WhenProcessor) String() string {
8695
// processors, one with `Close` and one without. The decision of
8796
// which to return is determined if the underlying processors require
8897
// `Close`. This is useful because some places in the code base
89-
// (eg. javascript processors) require stateless processors (no Close
98+
// (e.g. javascript processors) require stateless processors (no Close
9099
// method).
91100
type ClosingWhenProcessor struct {
92101
WhenProcessor
@@ -199,6 +208,27 @@ func (p *IfThenElseProcessor) Run(event *beat.Event) (*beat.Event, error) {
199208
return event, nil
200209
}
201210

211+
func (p *IfThenElseProcessor) SetPaths(paths *paths.Path) error {
212+
var err error
213+
for _, proc := range p.then.List {
214+
if procWithSet, ok := proc.(PathSetter); ok {
215+
err = errors.Join(err, procWithSet.SetPaths(paths))
216+
}
217+
}
218+
219+
if p.els == nil {
220+
return err
221+
}
222+
223+
for _, proc := range p.els.List {
224+
if procWithSet, ok := proc.(PathSetter); ok {
225+
err = errors.Join(err, procWithSet.SetPaths(paths))
226+
}
227+
}
228+
229+
return err
230+
}
231+
202232
func (p *IfThenElseProcessor) String() string {
203233
var sb strings.Builder
204234
sb.WriteString("if ")

libbeat/processors/conditionals_test.go

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/stretchr/testify/assert"
2727
"github.com/stretchr/testify/require"
2828

29+
"github.com/elastic/elastic-agent-libs/paths"
30+
2931
"github.com/elastic/beats/v7/libbeat/beat"
3032
"github.com/elastic/beats/v7/libbeat/conditions"
3133
conf "github.com/elastic/elastic-agent-libs/config"
@@ -248,6 +250,18 @@ func (c *errorProcessor) Close() error {
248250
return ErrProcessorClose
249251
}
250252

253+
var ErrSetPathsProcessor = fmt.Errorf("error processor set paths error")
254+
255+
type setPathsProcessor struct{}
256+
257+
func (c *setPathsProcessor) Run(e *beat.Event) (*beat.Event, error) {
258+
return e, nil
259+
}
260+
func (c *setPathsProcessor) String() string { return "error_processor" }
261+
func (c *setPathsProcessor) SetPaths(p *paths.Path) error {
262+
return fmt.Errorf("error_processor set paths error: %s", p)
263+
}
264+
251265
func TestConditionRuleClose(t *testing.T) {
252266
const whenCondition = `
253267
contains.a: b
@@ -289,16 +303,73 @@ then:
289303

290304
beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, ""))
291305
require.NoError(t, err)
306+
requireImplements[Closer](t, beatProcessor)
292307

293308
// Verify we got a ClosingIfThenElseProcessor
294309
closingProc := requireAs[*ClosingIfThenElseProcessor](t, beatProcessor)
295310
assert.Nil(t, closingProc.els, "els should be nil when no else clause is provided")
296-
assert.Implements(t, (*Closer)(nil), beatProcessor)
297311

298312
err = closingProc.Close()
299313
require.NoError(t, err)
300314
}
301315

316+
func TestIfThenElseProcessorSetPaths(t *testing.T) {
317+
logger := logptest.NewTestingLogger(t, "")
318+
thenProcessors := &Processors{
319+
List: []beat.Processor{&setPathsProcessor{}},
320+
log: logger,
321+
}
322+
elsProcessors := &Processors{
323+
List: []beat.Processor{&setPathsProcessor{}},
324+
log: logger,
325+
}
326+
proc := &IfThenElseProcessor{
327+
cond: nil,
328+
then: thenProcessors,
329+
els: elsProcessors,
330+
}
331+
332+
// SetPaths should not panic when then is nil
333+
tmpDir := t.TempDir()
334+
beatPaths := &paths.Path{
335+
Home: tmpDir,
336+
Config: tmpDir,
337+
Data: tmpDir,
338+
Logs: tmpDir,
339+
}
340+
err := proc.SetPaths(beatPaths)
341+
require.ErrorAs(t, err, &ErrSetPathsProcessor)
342+
require.ErrorContains(t, err, ErrSetPathsProcessor.Error())
343+
require.ErrorContains(t, err, beatPaths.String())
344+
}
345+
346+
func TestIfThenElseProcessorSetPathsNil(t *testing.T) {
347+
const cfg = `
348+
if:
349+
equals.test: value
350+
then:
351+
- add_fields: {target: "", fields: {test_field: test_value}}
352+
`
353+
c, err := conf.NewConfigWithYAML([]byte(cfg), "if-then config")
354+
require.NoError(t, err)
355+
356+
beatProcessor, err := NewIfElseThenProcessor(c, logptest.NewTestingLogger(t, ""))
357+
require.NoError(t, err)
358+
359+
proc := requireImplements[PathSetter](t, beatProcessor)
360+
361+
// SetPaths should not panic when then is nil
362+
tmpDir := t.TempDir()
363+
beatPaths := &paths.Path{
364+
Home: tmpDir,
365+
Config: tmpDir,
366+
Data: tmpDir,
367+
Logs: tmpDir,
368+
}
369+
err = proc.SetPaths(beatPaths)
370+
require.NoError(t, err)
371+
}
372+
302373
// requireAs performs a type assertion and requires it to succeed.
303374
func requireAs[T any](t *testing.T, v any) T {
304375
t.Helper()
@@ -310,3 +381,14 @@ func requireAs[T any](t *testing.T, v any) T {
310381

311382
return result
312383
}
384+
385+
func requireImplements[T any](t *testing.T, v any) T {
386+
t.Helper()
387+
expected := (*T)(nil)
388+
require.Implements(t, expected, v)
389+
390+
result, ok := v.(T)
391+
require.True(t, ok, "sanity check: expected %T, got %T", expected, v)
392+
393+
return result
394+
}

0 commit comments

Comments
 (0)