Skip to content

Commit

Permalink
Add support to configure min GC intervals
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu committed Feb 21, 2025
1 parent f024b0d commit 1f8a2a5
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 82 deletions.
25 changes: 25 additions & 0 deletions .chloggen/add-support-for-gc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: memorylimiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support to configure min GC intervals for soft and hard limits.

# One or more tracking issues or pull requests related to the change
issues: [12450]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
3 changes: 2 additions & 1 deletion extension/memorylimiterextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/memorylimiterextension/internal/metadata"
"go.opentelemetry.io/collector/internal/memorylimiter"
)

// NewFactory returns a new factory for the Memory Limiter extension.
Expand All @@ -25,7 +26,7 @@ func NewFactory() extension.Factory {
// CreateDefaultConfig creates the default configuration for extension. Notice
// that the default configuration is expected to fail for this extension.
func createDefaultConfig() component.Config {
return &Config{}
return memorylimiter.NewDefaultConfig()
}

func create(_ context.Context, set extension.Settings, cfg component.Config) (extension.Extension, error) {
Expand Down
28 changes: 9 additions & 19 deletions extension/memorylimiterextension/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
)
Expand Down Expand Up @@ -70,14 +70,20 @@ func TestMemoryPressureResponse(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.GetMemoryFn = func() (uint64, error) {
return uint64(2048), nil
}
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc
}
t.Cleanup(func() {
memorylimiter.GetMemoryFn = iruntime.TotalMemory
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
})
ml, err := newMemoryLimiter(tt.mlCfg, zap.NewNop())
assert.NoError(t, err)

assert.NoError(t, ml.Start(ctx, &mockHost{}))
assert.NoError(t, ml.Start(ctx, componenttest.NewNopHost()))
ml.memLimiter.CheckMemLimits()
mustRefuse := ml.MustRefuse()
if tt.expectError {
Expand All @@ -88,20 +94,4 @@ func TestMemoryPressureResponse(t *testing.T) {
assert.NoError(t, ml.Shutdown(ctx))
})
}
t.Cleanup(func() {
memorylimiter.GetMemoryFn = iruntime.TotalMemory
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
})
}

type mockHost struct {
component.Host
}

func (h *mockHost) GetExtensions() map[component.ID]component.Component {
return make(map[component.ID]component.Component)
}

func totalMemory() (uint64, error) {
return uint64(2048), nil
}
20 changes: 20 additions & 0 deletions internal/memorylimiter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

var (
errCheckIntervalOutOfRange = errors.New("'check_interval' must be greater than zero")
errInconsistentGCMinInterval = errors.New("'min_gc_interval_when_soft_limited' should be larger than 'min_gc_interval_when_hard_limited'")
errLimitOutOfRange = errors.New("'limit_mib' or 'limit_percentage' must be greater than zero")
errSpikeLimitOutOfRange = errors.New("'spike_limit_mib' must be smaller than 'limit_mib'")
errSpikeLimitPercentageOutOfRange = errors.New("'spike_limit_percentage' must be smaller than 'limit_percentage'")
Expand All @@ -26,6 +27,16 @@ type Config struct {
// checks will be performed.
CheckInterval time.Duration `mapstructure:"check_interval"`

// MinGCIntervalWhenSoftLimited minimum interval between forced GC when in soft (=limit_mib - spike_limit_mib) limited mode.
// Zero value means no minimum interval.
// GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector.
MinGCIntervalWhenSoftLimited time.Duration `mapstructure:"min_gc_interval_when_soft_limited"`

// MinGCIntervalWhenHardLimited minimum interval between forced GC when in hard (=limit_mib) limited mode.
// Zero value means no minimum interval.
// GCs is a CPU-heavy operation and executing it too frequently may affect the recovery capabilities of the collector.
MinGCIntervalWhenHardLimited time.Duration `mapstructure:"min_gc_interval_when_hard_limited"`

// MemoryLimitMiB is the maximum amount of memory, in MiB, targeted to be
// allocated by the process.
MemoryLimitMiB uint32 `mapstructure:"limit_mib"`
Expand All @@ -45,11 +56,20 @@ type Config struct {

var _ component.Config = (*Config)(nil)

func NewDefaultConfig() *Config {
return &Config{
MinGCIntervalWhenSoftLimited: 10 * time.Second,
}

Check warning on line 62 in internal/memorylimiter/config.go

View check run for this annotation

Codecov / codecov/patch

internal/memorylimiter/config.go#L59-L62

Added lines #L59 - L62 were not covered by tests
}

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}
if cfg.MinGCIntervalWhenSoftLimited < cfg.MinGCIntervalWhenHardLimited {
return errInconsistentGCMinInterval
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}
Expand Down
11 changes: 11 additions & 0 deletions internal/memorylimiter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ func TestConfigValidate(t *testing.T) {
},
err: errSpikeLimitPercentageOutOfRange,
},
{
name: "invalid gc intervals",
cfg: &Config{
CheckInterval: 100 * time.Millisecond,
MinGCIntervalWhenSoftLimited: 50 * time.Millisecond,
MinGCIntervalWhenHardLimited: 100 * time.Millisecond,
MemoryLimitMiB: 5722,
MemorySpikeLimitMiB: 1907,
},
err: errInconsistentGCMinInterval,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
11 changes: 10 additions & 1 deletion internal/memorylimiter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/shirou/gopsutil/v4 v4.25.1
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.120.0
go.opentelemetry.io/collector/component/componenttest v0.120.0
go.opentelemetry.io/collector/confmap v1.26.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
Expand All @@ -14,9 +15,12 @@ require (
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
Expand All @@ -28,13 +32,16 @@ require (
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/pdata v1.26.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241202173237-19429a94021a // indirect
google.golang.org/grpc v1.70.0 // indirect
Expand All @@ -47,3 +54,5 @@ replace go.opentelemetry.io/collector/confmap => ../../confmap
replace go.opentelemetry.io/collector/component => ../../component

replace go.opentelemetry.io/collector/pdata => ../../pdata

replace go.opentelemetry.io/collector/component/componenttest => ../../component/componenttest
19 changes: 11 additions & 8 deletions internal/memorylimiter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

83 changes: 42 additions & 41 deletions internal/memorylimiter/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (

const (
mibBytes = 1024 * 1024

// Minimum interval between forced GC when in soft limited mode. We don't want to
// do GCs too frequently since it is a CPU-heavy operation.
minGCIntervalWhenSoftLimited = 10 * time.Second
)

var (
Expand All @@ -50,11 +46,14 @@ type MemoryLimiter struct {

ticker *time.Ticker

lastGCDone time.Time
minGCIntervalWhenSoftLimited time.Duration
minGCIntervalWhenHardLimited time.Duration
lastGCDone time.Time

// The function to read the mem values is set as a reference to help with
// The functions to read the mem values and run GS are set as a reference to help with
// testing different values.
readMemStatsFn func(m *runtime.MemStats)
runGCFn func()

// Fields used for logging.
logger *zap.Logger
Expand All @@ -78,18 +77,20 @@ func NewMemoryLimiter(cfg *Config, logger *zap.Logger) (*MemoryLimiter, error) {
zap.Duration("check_interval", cfg.CheckInterval))

return &MemoryLimiter{
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ticker: time.NewTicker(cfg.CheckInterval),
readMemStatsFn: ReadMemStatsFn,
logger: logger,
mustRefuse: &atomic.Bool{},
usageChecker: *usageChecker,
memCheckWait: cfg.CheckInterval,
ticker: time.NewTicker(cfg.CheckInterval),
minGCIntervalWhenSoftLimited: cfg.MinGCIntervalWhenSoftLimited,
minGCIntervalWhenHardLimited: cfg.MinGCIntervalWhenHardLimited,
lastGCDone: time.Now(),
readMemStatsFn: ReadMemStatsFn,
runGCFn: runtime.GC,
logger: logger,
mustRefuse: &atomic.Bool{},
}, nil
}

// startMonitoring starts a single ticker'd goroutine per instance
// that will check memory usage every checkInterval period.
func (ml *MemoryLimiter) startMonitoring() {
func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {

Check warning on line 93 in internal/memorylimiter/memorylimiter.go

View check run for this annotation

Codecov / codecov/patch

internal/memorylimiter/memorylimiter.go#L93

Added line #L93 was not covered by tests
ml.refCounterLock.Lock()
defer ml.refCounterLock.Unlock()

Expand All @@ -110,10 +111,6 @@ func (ml *MemoryLimiter) startMonitoring() {
}
}()
}
}

func (ml *MemoryLimiter) Start(_ context.Context, _ component.Host) error {
ml.startMonitoring()
return nil
}

Expand Down Expand Up @@ -167,7 +164,7 @@ func memstatToZapField(ms *runtime.MemStats) zap.Field {
}

func (ml *MemoryLimiter) doGCandReadMemStats() *runtime.MemStats {
runtime.GC()
ml.runGCFn()
ml.lastGCDone = time.Now()
ms := ml.readMemStats()
ml.logger.Info("Memory usage after GC.", memstatToZapField(ms))
Expand All @@ -180,38 +177,42 @@ func (ml *MemoryLimiter) CheckMemLimits() {

ml.logger.Debug("Currently used memory.", memstatToZapField(ms))

if ml.usageChecker.aboveHardLimit(ms) {
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
}

// Remember current state.
wasRefusing := ml.mustRefuse.Load()

// Check if the memory usage is above the soft limit.
mustRefuse := ml.usageChecker.aboveSoftLimit(ms)

if wasRefusing && !mustRefuse {
// Was previously refusing but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
// Check if we are below the soft limit.
aboveSoftLimit := ml.usageChecker.aboveSoftLimit(ms)
if !aboveSoftLimit {
if ml.mustRefuse.Load() {
// Was previously refusing but enough memory is available now, no need to limit.
ml.logger.Info("Memory usage back within limits. Resuming normal operation.", memstatToZapField(ms))
}
ml.mustRefuse.Store(aboveSoftLimit)
return
}

if !wasRefusing && mustRefuse {
if ml.usageChecker.aboveHardLimit(ms) {
// We are above hard limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenHardLimited {
ml.logger.Warn("Memory usage is above hard limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
}
} else {
// We are above soft limit, do a GC if it wasn't done recently and see if
// it brings memory usage below the soft limit.
if time.Since(ml.lastGCDone) > minGCIntervalWhenSoftLimited {
if time.Since(ml.lastGCDone) > ml.minGCIntervalWhenSoftLimited {
ml.logger.Info("Memory usage is above soft limit. Forcing a GC.", memstatToZapField(ms))
ms = ml.doGCandReadMemStats()
// Check the limit again to see if GC helped.
mustRefuse = ml.usageChecker.aboveSoftLimit(ms)
aboveSoftLimit = ml.usageChecker.aboveSoftLimit(ms)
}
}

if mustRefuse {
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
}
if !ml.mustRefuse.Load() && aboveSoftLimit {
ml.logger.Warn("Memory usage is above soft limit. Refusing data.", memstatToZapField(ms))
}

ml.mustRefuse.Store(mustRefuse)
ml.mustRefuse.Store(aboveSoftLimit)
}

type memUsageChecker struct {
Expand Down
Loading

0 comments on commit 1f8a2a5

Please sign in to comment.