Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process.
  • Loading branch information
dmitryax committed Dec 27, 2023
1 parent 932a4ed commit 5cdf3c4
Show file tree
Hide file tree
Showing 42 changed files with 628 additions and 103 deletions.
32 changes: 32 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `WithRequestQueue`: a new exporter helper option for using a queue.
- `Queue`: an interface for queue implementations.
- `QueueFactory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- `QueueCreateSettings`: queue factory settings.
- `QueueConfig`: common configuration for queue implementations.
- `NewDefaultQueueConfig`: a function for creating a default queue configuration.
- `NewMemoryQueueFactory`: a new factory for creating a memory queue.
- `NewPersistentQueueFactory: a factory for creating a persistent queue.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ check-contrib:
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/confighttp=$(CURDIR)/config/confighttp"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/confignet=$(CURDIR)/config/confignet"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configopaque=$(CURDIR)/config/configopaque"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configqueue=$(CURDIR)/config/configqueue"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configretry=$(CURDIR)/config/configretry"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configtelemetry=$(CURDIR)/config/configtelemetry"
@$(MAKE) -C $(CONTRIB_PATH) for-all CMD="$(GOCMD) mod edit -replace go.opentelemetry.io/collector/config/configtls=$(CURDIR)/config/configtls"
Expand Down
1 change: 1 addition & 0 deletions cmd/builder/test/core.builder.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ replaces:
- go.opentelemetry.io/collector/config/confighttp => ${WORKSPACE_DIR}/config/confighttp
- go.opentelemetry.io/collector/config/confignet => ${WORKSPACE_DIR}/config/confignet
- go.opentelemetry.io/collector/config/configopaque => ${WORKSPACE_DIR}/config/configopaque
- go.opentelemetry.io/collector/config/configqueue => ${WORKSPACE_DIR}/config/configqueue
- go.opentelemetry.io/collector/config/configretry => ${WORKSPACE_DIR}/config/configretry
- go.opentelemetry.io/collector/config/configtelemetry => ${WORKSPACE_DIR}/config/configtelemetry
- go.opentelemetry.io/collector/config/configtls => ${WORKSPACE_DIR}/config/configtls
Expand Down
2 changes: 2 additions & 0 deletions cmd/mdatagen/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ replace go.opentelemetry.io/collector/exporter => ../../exporter
replace go.opentelemetry.io/collector/semconv => ../../semconv

replace go.opentelemetry.io/collector/receiver => ../../receiver

replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue
1 change: 1 addition & 0 deletions cmd/otelcorecol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ replaces:
- go.opentelemetry.io/collector/config/confighttp => ../../config/confighttp
- go.opentelemetry.io/collector/config/confignet => ../../config/confignet
- go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque
- go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue
- go.opentelemetry.io/collector/config/configretry => ../../config/configretry
- go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry
- go.opentelemetry.io/collector/config/configtls => ../../config/configtls
Expand Down
3 changes: 3 additions & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ require (
go.opentelemetry.io/collector/config/confighttp v0.91.0 // indirect
go.opentelemetry.io/collector/config/confignet v0.91.0 // indirect
go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect
go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/config/configretry v0.0.0-20231221085427-9027a8d9cc3f // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect
go.opentelemetry.io/collector/config/configtls v0.91.0 // indirect
Expand Down Expand Up @@ -146,6 +147,8 @@ replace go.opentelemetry.io/collector/config/confignet => ../../config/confignet

replace go.opentelemetry.io/collector/config/configopaque => ../../config/configopaque

replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue

replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry

replace go.opentelemetry.io/collector/config/configtelemetry => ../../config/configtelemetry
Expand Down
3 changes: 3 additions & 0 deletions config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/config/configqueue v0.0.0-00010101000000-000000000000 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect
go.opentelemetry.io/collector/confmap v0.91.0 // indirect
go.opentelemetry.io/collector/extension v0.91.0 // indirect
Expand Down Expand Up @@ -86,6 +87,8 @@ replace go.opentelemetry.io/collector/config/configopaque => ../configopaque

replace go.opentelemetry.io/collector/config/configtls => ../configtls

replace go.opentelemetry.io/collector/config/configqueue => ../../config/configqueue

replace go.opentelemetry.io/collector/config/configretry => ../../config/configretry

replace go.opentelemetry.io/collector/config/configtelemetry => ../configtelemetry
Expand Down
2 changes: 2 additions & 0 deletions config/confighttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,5 @@ replace go.opentelemetry.io/collector/component => ../../component
replace go.opentelemetry.io/collector/consumer => ../../consumer

replace go.opentelemetry.io/collector/config/configretry => ../configretry

replace go.opentelemetry.io/collector/config/configqueue => ../configqueue
1 change: 1 addition & 0 deletions config/configqueue/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
48 changes: 48 additions & 0 deletions config/configqueue/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
module go.opentelemetry.io/collector/config/configqueue

go 1.20

require (
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.91.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.0.1 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.91.0 // indirect
go.opentelemetry.io/collector/confmap v0.91.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0 // indirect
go.opentelemetry.io/collector/pdata v1.0.0 // indirect
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/grpc v1.60.1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace go.opentelemetry.io/collector/component => ../../component

replace go.opentelemetry.io/collector/pdata => ../../pdata

replace go.opentelemetry.io/collector/featuregate => ../../featuregate

replace go.opentelemetry.io/collector/confmap => ../../confmap

replace go.opentelemetry.io/collector/config/configtelemetry => ../configtelemetry
90 changes: 90 additions & 0 deletions config/configqueue/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions config/configqueue/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package configqueue // import "go.opentelemetry.io/collector/config/configqueue"

import (
"errors"

"go.opentelemetry.io/collector/component"
)

// QueueConfig defines configuration for queueing requests before exporting.
// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before exporting.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue.
NumConsumers int `mapstructure:"num_consumers"`
// QueueSizeItems is the maximum number of items (spans, metric data points or log records)
// allowed in queue at any given time.
QueueSizeItems int `mapstructure:"queue_size_items"`
// QueueSize is the maximum number of requests allowed in queue at any given time.
// This option is left for backward compatibility and will be deprecated in the future.
// It's recommended to use QueueSizeItems instead.
QueueSize int `mapstructure:"queue_size"`
}

// NewDefaultQueueConfig returns the default Config.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
Enabled: true,
NumConsumers: 10,
QueueSizeItems: 100_000,
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
if qCfg.NumConsumers <= 0 {
return errors.New("number of consumers must be positive")
}
if qCfg.QueueSizeItems > 0 && qCfg.QueueSize > 0 {
return errors.New("only one of 'queue_size' and 'queue_size_items' can be specified")
}
if qCfg.QueueSizeItems <= 0 && qCfg.QueueSize <= 0 {
return errors.New("queue size must be positive")
}
return nil
}

// PersistentQueueConfig defines configuration for queueing requests in a persistent storage.
// The struct is provided to be added in the exporter configuration as one struct under the "sending_queue" key.
// The exporter helper Go interface requires the fields to be provided separately to WithRequestQueue and
// NewPersistentQueueFactory.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type PersistentQueueConfig struct {
QueueConfig `mapstructure:",squash"`
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue
StorageID *component.ID `mapstructure:"storage"`
}
Loading

0 comments on commit 5cdf3c4

Please sign in to comment.