Skip to content

Commit 4ddf2cc

Browse files
[service] Share log sampler core allocations with reflect magic (#13107)
#### Context PR #12617, which implemented the injection of component-identifying attributes into the `zap.Logger` provided to components, introduced significant additional memory use when the Collector's pipelines contain many components (#13014). This was because we would call `zapcore.NewSamplerWithOptions` to wrap the specialized logger core of each Collector component, which allocates half a megabyte's worth of sampling counters. This problem was mitigated in #13015 by moving the sampling layer to a different location in the logger core hierarchy. This meant that Collector users that do not export their logs through OTLP and only use stdout-based logs no longer saw the memory increase. #### Description This PR aims to provide a better solution to this issue, by using the `reflect` library to clone zap's sampler core and set a new inner core, while reusing the counter allocation. (This may also be "more correct" from a sampling point of view, ie. we only have one global instance of the counters instead of one for console logs and one for each component's OTLP-exported logs, but I'm not sure if anyone noticed the difference anyway). #### Link to tracking issue Fixes #13014 #### Testing A new test was added which checks that the log counters are shared between two sampler cores with different attributes.
1 parent 8dd29c3 commit 4ddf2cc

File tree

5 files changed

+204
-35
lines changed

5 files changed

+204
-35
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: service
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Only allocate one set of internal log sampling counters
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13014]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
The case where logs are only exported to stdout was fixed in v0.126.0;
20+
this new fix also covers the case where logs are exported through OTLP.
21+
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

internal/telemetry/componentattribute/logger_test.go

Lines changed: 111 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
package componentattribute_test
4+
package componentattribute
55

66
import (
77
"testing"
8+
"time"
89

910
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
@@ -14,7 +15,6 @@ import (
1415
"go.uber.org/zap/zapcore"
1516
"go.uber.org/zap/zaptest/observer"
1617

17-
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
1818
"go.opentelemetry.io/collector/pipeline"
1919
)
2020

@@ -31,7 +31,7 @@ type test struct {
3131
func createZapCore() (zapcore.Core, *observer.ObservedLogs) {
3232
core, observed := observer.New(zap.DebugLevel)
3333
core = core.With([]zapcore.Field{zap.String("preexisting", "value")})
34-
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
34+
core = NewConsoleCoreWithAttributes(core, attribute.NewSet())
3535
return core, observed
3636
}
3737

@@ -40,13 +40,13 @@ func checkZapLogs(t *testing.T, observed *observer.ObservedLogs) {
4040
require.Len(t, observedLogs, 3)
4141

4242
parentContext := map[string]string{
43-
"preexisting": "value",
44-
componentattribute.SignalKey: pipeline.SignalLogs.String(),
45-
componentattribute.ComponentIDKey: "filelog",
43+
"preexisting": "value",
44+
SignalKey: pipeline.SignalLogs.String(),
45+
ComponentIDKey: "filelog",
4646
}
4747
childContext := map[string]string{
48-
"preexisting": "value",
49-
componentattribute.ComponentIDKey: "filelog",
48+
"preexisting": "value",
49+
ComponentIDKey: "filelog",
5050
}
5151

5252
require.Equal(t, "test parent before child", observedLogs[0].Message)
@@ -70,8 +70,8 @@ func checkZapLogs(t *testing.T, observed *observer.ObservedLogs) {
7070

7171
func TestCore(t *testing.T) {
7272
attrs := attribute.NewSet(
73-
attribute.String(componentattribute.SignalKey, pipeline.SignalLogs.String()),
74-
attribute.String(componentattribute.ComponentIDKey, "filelog"),
73+
attribute.String(SignalKey, pipeline.SignalLogs.String()),
74+
attribute.String(ComponentIDKey, "filelog"),
7575
)
7676

7777
tests := []test{
@@ -90,7 +90,7 @@ func TestCore(t *testing.T) {
9090
createLogger: func() (*zap.Logger, logRecorder) {
9191
core, observed := createZapCore()
9292
recorder := logtest.NewRecorder()
93-
core = componentattribute.NewOTelTeeCoreWithAttributes(core, recorder, "testinstr", zap.DebugLevel, attribute.NewSet(), func(c zapcore.Core) zapcore.Core { return c })
93+
core = NewOTelTeeCoreWithAttributes(core, recorder, "testinstr", zap.DebugLevel, attribute.NewSet())
9494
return zap.New(core), logRecorder{zapLogs: observed, otelLogs: recorder}
9595
},
9696
check: func(t *testing.T, rec logRecorder) {
@@ -107,7 +107,7 @@ func TestCore(t *testing.T) {
107107
}
108108

109109
childAttrs := attribute.NewSet(
110-
attribute.String(componentattribute.ComponentIDKey, "filelog"),
110+
attribute.String(ComponentIDKey, "filelog"),
111111
)
112112

113113
assert.Equal(t, map[string]attribute.Set{
@@ -122,13 +122,110 @@ func TestCore(t *testing.T) {
122122
t.Run(test.name, func(t *testing.T) {
123123
logger, state := test.createLogger()
124124

125-
parent := componentattribute.ZapLoggerWithAttributes(logger, attrs)
125+
parent := ZapLoggerWithAttributes(logger, attrs)
126126
parent.Info("test parent before child")
127-
child := componentattribute.ZapLoggerWithAttributes(parent, componentattribute.RemoveAttributes(attrs, componentattribute.SignalKey))
127+
child := ZapLoggerWithAttributes(parent, RemoveAttributes(attrs, SignalKey))
128128
child.Info("test child")
129129
parent.Info("test parent after child")
130130

131131
test.check(t, state)
132132
})
133133
}
134134
}
135+
136+
func TestSamplerCore(t *testing.T) {
137+
tick := time.Second
138+
// Drop identical messages after the first two
139+
first := 2
140+
thereafter := 0
141+
142+
type testCase struct {
143+
name string
144+
withAttributes func(inner zapcore.Core, sampler zapcore.Core, attrs attribute.Set) zapcore.Core
145+
expectedAttrs []string
146+
}
147+
testCases := []testCase{
148+
{
149+
name: "new-sampler",
150+
withAttributes: func(inner zapcore.Core, _ zapcore.Core, attrs attribute.Set) zapcore.Core {
151+
return zapcore.NewSamplerWithOptions(tryWithAttributeSet(inner, attrs), tick, first, thereafter)
152+
},
153+
expectedAttrs: []string{"foo", "bar", "foo", "bar"},
154+
},
155+
{
156+
name: "cloned-sampler",
157+
withAttributes: func(_ zapcore.Core, sampler zapcore.Core, attrs attribute.Set) zapcore.Core {
158+
return tryWithAttributeSet(sampler, attrs)
159+
},
160+
expectedAttrs: []string{"foo", "bar"},
161+
},
162+
}
163+
for _, tc := range testCases {
164+
t.Run(tc.name, func(t *testing.T) {
165+
inner, obs := observer.New(zapcore.DebugLevel)
166+
inner = NewConsoleCoreWithAttributes(inner, attribute.NewSet(attribute.String("test", "foo")))
167+
168+
sampler1 := NewSamplerCoreWithAttributes(inner, tick, first, thereafter)
169+
loggerFoo := zap.New(sampler1)
170+
171+
sampler2 := tc.withAttributes(inner, sampler1, attribute.NewSet(attribute.String("test", "bar")))
172+
loggerBar := zap.New(sampler2)
173+
174+
// If the two samplers share their counters, only the first two messages will go through.
175+
// If they are independent, the first three and the fifth will go through.
176+
loggerFoo.Info("test")
177+
loggerBar.Info("test")
178+
loggerFoo.Info("test")
179+
loggerFoo.Info("test")
180+
loggerBar.Info("test")
181+
loggerBar.Info("test")
182+
183+
var attrs []string
184+
for _, log := range obs.All() {
185+
var fooValue string
186+
for _, field := range log.Context {
187+
if field.Key == "test" {
188+
fooValue = field.String
189+
}
190+
}
191+
attrs = append(attrs, fooValue)
192+
}
193+
assert.Equal(t, tc.expectedAttrs, attrs)
194+
})
195+
}
196+
}
197+
198+
// Worst case scenario for the reflect spell in samplerCoreWithAttributes
199+
type crazySampler struct {
200+
Core int
201+
}
202+
203+
var _ zapcore.Core = (*crazySampler)(nil)
204+
205+
func (s *crazySampler) Enabled(zapcore.Level) bool {
206+
return true
207+
}
208+
209+
func (s *crazySampler) With([]zapcore.Field) zapcore.Core {
210+
return s
211+
}
212+
213+
func (s *crazySampler) Check(_ zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
214+
return ce
215+
}
216+
217+
func (s *crazySampler) Write(zapcore.Entry, []zapcore.Field) error {
218+
return nil
219+
}
220+
221+
func (s *crazySampler) Sync() error {
222+
return nil
223+
}
224+
225+
func TestSamplerCorePanic(t *testing.T) {
226+
sampler := NewSamplerCoreWithAttributes(zapcore.NewNopCore(), 1, 1, 1)
227+
sampler.(*samplerCoreWithAttributes).Core = &crazySampler{}
228+
assert.PanicsWithValue(t, "Unexpected Zap sampler type; see github.com/open-telemetry/opentelemetry-collector/issues/13014", func() {
229+
tryWithAttributeSet(sampler, attribute.NewSet())
230+
})
231+
}

internal/telemetry/componentattribute/logger_zap.go

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
package componentattribute // import "go.opentelemetry.io/collector/internal/telemetry/componentattribute"
55

66
import (
7+
"reflect"
8+
"time"
9+
710
"go.opentelemetry.io/contrib/bridges/otelzap"
811
"go.opentelemetry.io/otel/attribute"
912
"go.opentelemetry.io/otel/log"
@@ -76,7 +79,6 @@ type otelTeeCoreWithAttributes struct {
7679
lp log.LoggerProvider
7780
scopeName string
7881
level zapcore.Level
79-
wrapper func(zapcore.Core) zapcore.Core
8082
}
8183

8284
var _ coreWithAttributes = (*otelTeeCoreWithAttributes)(nil)
@@ -85,7 +87,7 @@ var _ coreWithAttributes = (*otelTeeCoreWithAttributes)(nil)
8587
// logs, component attributes are injected as instrumentation scope attributes.
8688
//
8789
// This is used when service::telemetry::logs::processors is configured.
88-
func NewOTelTeeCoreWithAttributes(consoleCore zapcore.Core, lp log.LoggerProvider, scopeName string, level zapcore.Level, attrs attribute.Set, wrapper func(zapcore.Core) zapcore.Core) zapcore.Core {
90+
func NewOTelTeeCoreWithAttributes(consoleCore zapcore.Core, lp log.LoggerProvider, scopeName string, level zapcore.Level, attrs attribute.Set) zapcore.Core {
8991
otelCore, err := zapcore.NewIncreaseLevelCore(otelzap.NewCore(
9092
scopeName,
9193
otelzap.WithLoggerProvider(lp),
@@ -96,17 +98,70 @@ func NewOTelTeeCoreWithAttributes(consoleCore zapcore.Core, lp log.LoggerProvide
9698
}
9799

98100
return &otelTeeCoreWithAttributes{
99-
Core: zapcore.NewTee(consoleCore, wrapper(otelCore)),
101+
Core: zapcore.NewTee(consoleCore, otelCore),
100102
consoleCore: consoleCore,
101103
lp: lp,
102104
scopeName: scopeName,
103105
level: level,
104-
wrapper: wrapper,
105106
}
106107
}
107108

108109
func (ocwa *otelTeeCoreWithAttributes) withAttributeSet(attrs attribute.Set) zapcore.Core {
109-
return NewOTelTeeCoreWithAttributes(tryWithAttributeSet(ocwa.consoleCore, attrs), ocwa.lp, ocwa.scopeName, ocwa.level, attrs, ocwa.wrapper)
110+
return NewOTelTeeCoreWithAttributes(tryWithAttributeSet(ocwa.consoleCore, attrs), ocwa.lp, ocwa.scopeName, ocwa.level, attrs)
111+
}
112+
113+
type samplerCoreWithAttributes struct {
114+
zapcore.Core
115+
from zapcore.Core
116+
}
117+
118+
var _ coreWithAttributes = (*samplerCoreWithAttributes)(nil)
119+
120+
func NewSamplerCoreWithAttributes(inner zapcore.Core, tick time.Duration, first int, thereafter int) zapcore.Core {
121+
return &samplerCoreWithAttributes{
122+
Core: zapcore.NewSamplerWithOptions(inner, tick, first, thereafter),
123+
from: inner,
124+
}
125+
}
126+
127+
func checkSamplerType(ty reflect.Type) bool {
128+
if ty.Kind() != reflect.Pointer {
129+
return false
130+
}
131+
ty = ty.Elem()
132+
if ty.Kind() != reflect.Struct {
133+
return false
134+
}
135+
innerField, ok := ty.FieldByName("Core")
136+
if !ok {
137+
return false
138+
}
139+
return reflect.TypeFor[zapcore.Core]().AssignableTo(innerField.Type)
140+
}
141+
142+
func (ssc *samplerCoreWithAttributes) withAttributeSet(attrs attribute.Set) zapcore.Core {
143+
newInner := tryWithAttributeSet(ssc.from, attrs)
144+
145+
// Relevant Zap code: https://github.com/uber-go/zap/blob/fcf8ee58669e358bbd6460bef5c2ee7a53c0803a/zapcore/sampler.go#L168
146+
// We need to create a new Zap sampler core with the same settings but with a new inner core,
147+
// while reusing the very RAM-intensive `counters` data structure.
148+
// The `With` method does something similar, but it only replaces pre-set fields, not the Core.
149+
// However, we can use `reflect` to accomplish this.
150+
// This hack can be removed once Zap supports this use case.
151+
// Tracking issue: https://github.com/uber-go/zap/issues/1498
152+
val1 := reflect.ValueOf(ssc.Core)
153+
if !checkSamplerType(val1.Type()) { // To avoid a more esoteric panic message below
154+
panic("Unexpected Zap sampler type; see github.com/open-telemetry/opentelemetry-collector/issues/13014")
155+
}
156+
val2 := reflect.New(val1.Type().Elem()) // core2 := new(sampler)
157+
val2.Elem().Set(val1.Elem()) // *core2 = *core1
158+
val2.Elem().FieldByName("Core").Set(reflect.ValueOf(newInner)) // core2.Core = newInner
159+
newSampler := val2.Interface().(zapcore.Core)
160+
161+
return samplerCoreWithAttributes{
162+
Core: newSampler,
163+
from: newInner,
164+
}
110165
}
111166

112167
// ZapLoggerWithAttributes creates a Zap Logger with a new set of injected component attributes.

service/telemetry/logger.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,33 +60,23 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error
6060

6161
var lp log.LoggerProvider
6262
logger = logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
63-
if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
64-
core = newSampledCore(core, cfg.Logs.Sampling)
65-
}
66-
6763
core = componentattribute.NewConsoleCoreWithAttributes(core, attribute.NewSet())
6864

6965
if len(cfg.Logs.Processors) > 0 && set.SDK != nil {
7066
lp = set.SDK.LoggerProvider()
71-
wrapper := func(c zapcore.Core) zapcore.Core {
72-
return c
73-
}
74-
if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
75-
wrapper = func(c zapcore.Core) zapcore.Core {
76-
return newSampledCore(c, cfg.Logs.Sampling)
77-
}
78-
}
79-
8067
core = componentattribute.NewOTelTeeCoreWithAttributes(
8168
core,
8269
lp,
8370
"go.opentelemetry.io/collector/service/telemetry",
8471
cfg.Logs.Level,
8572
attribute.NewSet(),
86-
wrapper,
8773
)
8874
}
8975

76+
if cfg.Logs.Sampling != nil && cfg.Logs.Sampling.Enabled {
77+
core = newSampledCore(core, cfg.Logs.Sampling)
78+
}
79+
9080
return core
9181
}))
9282

@@ -96,7 +86,7 @@ func newLogger(set Settings, cfg Config) (*zap.Logger, log.LoggerProvider, error
9686
func newSampledCore(core zapcore.Core, sc *LogsSamplingConfig) zapcore.Core {
9787
// Create a logger that samples every Nth message after the first M messages every S seconds
9888
// where N = sc.Thereafter, M = sc.Initial, S = sc.Tick.
99-
return zapcore.NewSamplerWithOptions(
89+
return componentattribute.NewSamplerCoreWithAttributes(
10090
core,
10191
sc.Tick,
10292
sc.Initial,

service/telemetry/logger_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestNewLogger(t *testing.T) {
100100
InitialFields: map[string]any(nil),
101101
},
102102
},
103-
wantCoreType: "*componentattribute.consoleCoreWithAttributes",
103+
wantCoreType: "*componentattribute.samplerCoreWithAttributes",
104104
},
105105
}
106106
for _, tt := range tests {

0 commit comments

Comments
 (0)