From 468b35bb990e0c8d48bff92a21cc6af5b0f3c4d7 Mon Sep 17 00:00:00 2001 From: Xander Garbett Date: Fri, 10 Jan 2025 13:37:53 +0000 Subject: [PATCH] Add resource processor extractor (#330) * Add resource processor extractor * avoid needing to change deps * Reset the go mods to original * More reverts --- processor/loghouseprocessor/processor.go | 25 ++++++++ processor/loghouseprocessor/processor_test.go | 63 +++++++++++++++++++ 2 files changed, 88 insertions(+) diff --git a/processor/loghouseprocessor/processor.go b/processor/loghouseprocessor/processor.go index 7eff855c9ae6..56c95ef0edc3 100644 --- a/processor/loghouseprocessor/processor.go +++ b/processor/loghouseprocessor/processor.go @@ -55,6 +55,9 @@ func (p *logProcessor) ConsumeLogs(ctx context.Context, l plog.Logs) error { if err != nil { p.logger.Debug("failed to parse log line", zap.Error(err)) } + // This does have a "last line wins" if we have somehow set the same key with different values. + // We are just going to ignore this case for now though. + promoteResourceAttrs(&logLine, &rlogs) } } } @@ -144,6 +147,28 @@ func processJSONLog(l *plog.LogRecord) { } } +func promoteResourceAttrs(l *plog.LogRecord, rlogs *plog.ResourceLogs) { + attributes, ok := l.Attributes().Get("resource") + if !ok { + return + } + merged := MergeRawMaps(rlogs.Resource().Attributes().AsRaw(), attributes.Map().AsRaw()) + rlogs.Resource().Attributes().FromRaw(merged) +} + +// MergeRawMaps merges n maps with a later map's keys overriding earlier maps. (copied to avoid dep hell) +func MergeRawMaps(maps ...map[string]any) map[string]any { + ret := map[string]any{} + + for _, m := range maps { + for k, v := range m { + ret[k] = v + } + } + + return ret +} + // Both funcs copied from: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/contexts/internal/ids.go#L25 func ParseSpanID(spanIDStr string) (pcommon.SpanID, error) { diff --git a/processor/loghouseprocessor/processor_test.go b/processor/loghouseprocessor/processor_test.go index 319140b7a6d2..a945a883b9e4 100644 --- a/processor/loghouseprocessor/processor_test.go +++ b/processor/loghouseprocessor/processor_test.go @@ -286,3 +286,66 @@ func Test_promoteTraceAndSpan(t *testing.T) { }) } } + +func Test_promoteResourceAttrs(t *testing.T) { + t.Run("single log", func(t *testing.T) { + rl := plog.NewResourceLogs() + l := plog.NewLogRecord() + l.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}}) + + promoteResourceAttrs(&l, &rl) + + val, ok := rl.Resource().Attributes().Get("r1") + assert.True(t, ok) + assert.Equal(t, "v1", val.Str()) + }) + + t.Run("two logs", func(t *testing.T) { + rl := plog.NewResourceLogs() + l1 := plog.NewLogRecord() + l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}}) + l2 := plog.NewLogRecord() + l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r2": "v2"}}) + + promoteResourceAttrs(&l1, &rl) + promoteResourceAttrs(&l2, &rl) + + v1, ok := rl.Resource().Attributes().Get("r1") + assert.True(t, ok) + assert.Equal(t, "v1", v1.Str()) + + v2, ok := rl.Resource().Attributes().Get("r2") + assert.True(t, ok) + assert.Equal(t, "v2", v2.Str()) + }) + + t.Run("last wins", func(t *testing.T) { + rl := plog.NewResourceLogs() + l1 := plog.NewLogRecord() + l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}}) + l2 := plog.NewLogRecord() + l2.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v2"}}) + + promoteResourceAttrs(&l1, &rl) + promoteResourceAttrs(&l2, &rl) + + v2, ok := rl.Resource().Attributes().Get("r1") + assert.True(t, ok) + assert.Equal(t, "v2", v2.Str()) + }) + + t.Run("overwrite original", func(t *testing.T) { + rl := plog.NewResourceLogs() + rl.Resource().Attributes().FromRaw(map[string]any{"r1": "original"}) + l1 := plog.NewLogRecord() + l1.Attributes().FromRaw(map[string]any{"resource": map[string]any{"r1": "v1"}}) + + promoteResourceAttrs(&l1, &rl) + + v1, ok := rl.Resource().Attributes().Get("r1") + assert.True(t, ok) + assert.Equal(t, "v1", v1.Str()) + + }) + +}