Skip to content

Commit

Permalink
Add resource processor extractor (#330)
Browse files Browse the repository at this point in the history
* Add resource processor extractor

* avoid needing to change deps

* Reset the go mods to original

* More reverts
  • Loading branch information
Garbett1 authored Jan 10, 2025
1 parent 4140f6d commit 468b35b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 0 deletions.
25 changes: 25 additions & 0 deletions processor/loghouseprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (p *logProcessor) ConsumeLogs(ctx context.Context, l plog.Logs) error {
if err != nil {
p.logger.Debug("failed to parse log line", zap.Error(err))
}
// This does have a "last line wins" if we have somehow set the same key with different values.
// We are just going to ignore this case for now though.
promoteResourceAttrs(&logLine, &rlogs)
}
}
}
Expand Down Expand Up @@ -144,6 +147,28 @@ func processJSONLog(l *plog.LogRecord) {
}
}

func promoteResourceAttrs(l *plog.LogRecord, rlogs *plog.ResourceLogs) {
attributes, ok := l.Attributes().Get("resource")
if !ok {
return
}
merged := MergeRawMaps(rlogs.Resource().Attributes().AsRaw(), attributes.Map().AsRaw())
rlogs.Resource().Attributes().FromRaw(merged)
}

// MergeRawMaps merges n maps with a later map's keys overriding earlier maps. (copied to avoid dep hell)
func MergeRawMaps(maps ...map[string]any) map[string]any {
ret := map[string]any{}

for _, m := range maps {
for k, v := range m {
ret[k] = v
}
}

return ret
}

// Both funcs copied from: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/internal/ids.go#L25

func ParseSpanID(spanIDStr string) (pcommon.SpanID, error) {
Expand Down
63 changes: 63 additions & 0 deletions processor/loghouseprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,66 @@ func Test_promoteTraceAndSpan(t *testing.T) {
})
}
}

func Test_promoteResourceAttrs(t *testing.T) {
t.Run("single log", func(t *testing.T) {
rl := plog.NewResourceLogs()
l := plog.NewLogRecord()
l.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})

promoteResourceAttrs(&l, &rl)

val, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", val.Str())
})

t.Run("two logs", func(t *testing.T) {
rl := plog.NewResourceLogs()
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})
l2 := plog.NewLogRecord()
l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r2": "v2"}})

promoteResourceAttrs(&l1, &rl)
promoteResourceAttrs(&l2, &rl)

v1, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", v1.Str())

v2, ok := rl.Resource().Attributes().Get("r2")
assert.True(t, ok)
assert.Equal(t, "v2", v2.Str())
})

t.Run("last wins", func(t *testing.T) {
rl := plog.NewResourceLogs()
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})
l2 := plog.NewLogRecord()
l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v2"}})

promoteResourceAttrs(&l1, &rl)
promoteResourceAttrs(&l2, &rl)

v2, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v2", v2.Str())
})

t.Run("overwrite original", func(t *testing.T) {
rl := plog.NewResourceLogs()
rl.Resource().Attributes().FromRaw(map[string]any{"r1": "original"})
l1 := plog.NewLogRecord()
l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}})

promoteResourceAttrs(&l1, &rl)

v1, ok := rl.Resource().Attributes().Get("r1")
assert.True(t, ok)
assert.Equal(t, "v1", v1.Str())

})

}

0 comments on commit 468b35b

Please sign in to comment.