Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/memorylimiter] Add profiles support to memorylimiter processor #12454

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .chloggen/iblancasa-memorylimiterprocessor-profiles.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: memorylimiterprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: |
Add support for profiles to the memorylimiterprocessor.

# One or more tracking issues or pull requests related to the change
issues: [12453]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ require (
go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline v0.120.0 // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0 // indirect
go.opentelemetry.io/collector/processor/processortest v0.120.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect
go.opentelemetry.io/collector/receiver/receivertest v0.120.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion processor/memorylimiterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta]: traces, metrics, logs |
| Stability | [alpha]: profiles |
| | [beta]: traces, metrics, logs |
| Distributions | [core], [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fmemorylimiter%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fmemorylimiter%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fmemorylimiter) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
[core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
Expand Down
37 changes: 32 additions & 5 deletions processor/memorylimiterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/xconsumer"
"go.opentelemetry.io/collector/internal/telemetry"
"go.opentelemetry.io/collector/internal/telemetry/componentattribute"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/xprocessor"
)

var processorCapabilities = consumer.Capabilities{MutatesData: false}
Expand All @@ -28,16 +31,17 @@ type factory struct {
}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() processor.Factory {
func NewFactory() xprocessor.Factory {
f := &factory{
memoryLimiters: map[component.Config]*memoryLimiterProcessor{},
}
return processor.NewFactory(
return xprocessor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithTraces(f.createTraces, metadata.TracesStability),
processor.WithMetrics(f.createMetrics, metadata.MetricsStability),
processor.WithLogs(f.createLogs, metadata.LogsStability))
xprocessor.WithTraces(f.createTraces, metadata.TracesStability),
xprocessor.WithMetrics(f.createMetrics, metadata.MetricsStability),
xprocessor.WithLogs(f.createLogs, metadata.LogsStability),
xprocessor.WithProfiles(f.createProfiles, metadata.ProfilesStability))
}

// CreateDefaultConfig creates the default configuration for processor. Notice
Expand Down Expand Up @@ -97,6 +101,29 @@ func (f *factory) createLogs(
processorhelper.WithShutdown(memLimiter.shutdown))
}

func (f *factory) createProfiles(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer xconsumer.Profiles,
) (xprocessor.Profiles, error) {
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}

return xprocessorhelper.NewProfiles(
ctx,
set,
cfg,
nextConsumer,
memLimiter.processProfiles,
xprocessorhelper.WithCapabilities(processorCapabilities),
xprocessorhelper.WithStart(memLimiter.start),
xprocessorhelper.WithShutdown(memLimiter.shutdown),
)
}

// getMemoryLimiter checks if we have a cached memoryLimiter with a specific config,
// otherwise initialize and add one to the store.
func (f *factory) getMemoryLimiter(set processor.Settings, cfg component.Config) (*memoryLimiterProcessor, error) {
Expand Down
6 changes: 6 additions & 0 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ func TestCreateProcessor(t *testing.T) {
assert.NotNil(t, lp)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))

pp, err := factory.CreateProfiles(context.Background(), set, cfg, consumertest.NewNop())
require.NoError(t, err)
assert.NotNil(t, pp)
assert.NoError(t, pp.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, lp.Shutdown(context.Background()))
assert.NoError(t, tp.Shutdown(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
assert.NoError(t, pp.Shutdown(context.Background()))
// verify that no monitoring routine is running
require.ErrorIs(t, tp.Shutdown(context.Background()), memorylimiter.ErrShutdownNotStarted)

Expand Down
8 changes: 5 additions & 3 deletions processor/memorylimiterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@ require (
go.opentelemetry.io/collector/consumer v1.26.0
go.opentelemetry.io/collector/consumer/consumererror v0.120.0
go.opentelemetry.io/collector/consumer/consumertest v0.120.0
go.opentelemetry.io/collector/consumer/xconsumer v0.120.0
go.opentelemetry.io/collector/internal/memorylimiter v0.120.0
go.opentelemetry.io/collector/internal/telemetry v0.120.0
go.opentelemetry.io/collector/pdata v1.26.0
go.opentelemetry.io/collector/pdata/pprofile v0.120.0
go.opentelemetry.io/collector/pipeline v0.120.0
go.opentelemetry.io/collector/pipeline/xpipeline v0.120.0
go.opentelemetry.io/collector/processor v0.120.0
go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.120.0
go.opentelemetry.io/collector/processor/processortest v0.120.0
go.opentelemetry.io/collector/processor/xprocessor v0.120.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/metric v1.34.0
go.opentelemetry.io/otel/sdk/metric v1.34.0
Expand Down Expand Up @@ -50,10 +55,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/component/componentstatus v0.120.0 // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.120.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.120.0 // indirect
go.opentelemetry.io/collector/pdata/testdata v0.120.0 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.120.0 // indirect
go.opentelemetry.io/otel/sdk v1.34.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.33.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions processor/memorylimiterprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"go.opentelemetry.io/collector/internal/memorylimiter"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pipeline"
"go.opentelemetry.io/collector/pipeline/xpipeline"
"go.opentelemetry.io/collector/processor"
)

Expand Down Expand Up @@ -100,3 +102,21 @@ func (p *memoryLimiterProcessor) processLogs(ctx context.Context, ld plog.Logs)
p.obsrep.accepted(ctx, numRecords, pipeline.SignalLogs)
return ld, nil
}

func (p *memoryLimiterProcessor) processProfiles(ctx context.Context, td pprofile.Profiles) (pprofile.Profiles, error) {
numProfiles := td.SampleCount()
if p.memlimiter.MustRefuse() {
// TODO: actually to be 100% sure that this is "refused" and not "dropped"
// it is necessary to check the pipeline to see if this is directly connected
// to a receiver (ie.: a receiver is on the call stack). For now it
// assumes that the pipeline is properly configured and a receiver is on the
// callstack and that the receiver will correctly retry the refused data again.
Comment on lines +109 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tangent: I wonder whether this comment (which appears 4 times now) will ever be resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Especially without an issue. Maybe replace it with an issue is the first step.

p.obsrep.refused(ctx, numProfiles, xpipeline.SignalProfiles)
return td, memorylimiter.ErrDataRefused
}

// Even if the next consumer returns error record the data as accepted by
// this processor.
p.obsrep.accepted(ctx, numProfiles, xpipeline.SignalProfiles)
return td, nil
}
92 changes: 92 additions & 0 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"go.opentelemetry.io/collector/internal/memorylimiter/iruntime"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadata"
"go.opentelemetry.io/collector/processor/memorylimiterprocessor/internal/metadatatest"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper"
"go.opentelemetry.io/collector/processor/processortest"
)

Expand Down Expand Up @@ -419,6 +421,96 @@ func TestLogMemoryPressureResponse(t *testing.T) {
})
}

// TestProfileMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestProfileMemoryPressureResponse(t *testing.T) {
pd := pprofile.NewProfiles()
ctx := context.Background()

tests := []struct {
name string
mlCfg *Config
memAlloc uint64
expectError bool
}{
{
name: "Below memAllocLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
memAlloc: 800,
expectError: false,
},
{
name: "Above memAllocLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 1,
},
memAlloc: 1800,
expectError: true,
},
{
name: "Below memSpikeLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 10,
},
memAlloc: 800,
expectError: false,
},
{
name: "Above memSpikeLimit",
mlCfg: &Config{
CheckInterval: time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 11,
},
memAlloc: 800,
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
memorylimiter.GetMemoryFn = totalMemory
memorylimiter.ReadMemStatsFn = func(ms *runtime.MemStats) {
ms.Alloc = tt.memAlloc
}

ml, err := newMemoryLimiterProcessor(processortest.NewNopSettingsWithType(metadata.Type), tt.mlCfg)
require.NoError(t, err)
tp, err := xprocessorhelper.NewProfiles(
context.Background(),
processortest.NewNopSettingsWithType(metadata.Type),
tt.mlCfg,
consumertest.NewNop(),
ml.processProfiles,
xprocessorhelper.WithCapabilities(processorCapabilities),
xprocessorhelper.WithStart(ml.start),
xprocessorhelper.WithShutdown(ml.shutdown))
require.NoError(t, err)

assert.NoError(t, tp.Start(ctx, &host{}))
ml.memlimiter.CheckMemLimits()
err = tp.ConsumeProfiles(ctx, pd)
if tt.expectError {
assert.Equal(t, memorylimiter.ErrDataRefused, err)
} else {
require.NoError(t, err)
}
assert.NoError(t, tp.Shutdown(ctx))
})
}
t.Cleanup(func() {
memorylimiter.GetMemoryFn = iruntime.TotalMemory
memorylimiter.ReadMemStatsFn = runtime.ReadMemStats
})
}

type host struct {
component.Host
}
Expand Down
1 change: 1 addition & 0 deletions processor/memorylimiterprocessor/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github_project: open-telemetry/opentelemetry-collector
status:
class: processor
stability:
alpha: [profiles]
beta: [traces, metrics, logs]
distributions: [core, contrib, k8s]

Expand Down
Loading