-
Notifications
You must be signed in to change notification settings - Fork 1.8k
enhancement(sinks): Add support for max_bytes
for memory buffers
#23330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
enhancement(sinks): Add support for max_bytes
for memory buffers
#23330
Conversation
e3ef011
to
bdc9f1d
Compare
lib/vector-buffers/src/config.rs
Outdated
#[serde(default = "memory_buffer_default_max_events")] | ||
max_events: NonZeroUsize, | ||
/// The terms around how to express buffering limits, can be in size or bytes_size. | ||
size: MemoryBufferSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this might introduce a little friction at least in the docs. Currently the memory buffering options are flat, but introducing an enum here will open up another nested set of options.
Confusingly there is a custom deserializer above that will allow a user to just use max_bytes
and things will work.
Should we just have two optional values here? I went with the enum because its essentially the same type that will be passed onto the method that chooses the implementation, but after seeing how the docs look i'm thinking about changing this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that Rob and I discussed this and he's planning to rework the config handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, thank you for sharing. We will take a look after all the open comments are addressed.
lib/vector-buffers/src/config.rs
Outdated
#[serde(default = "memory_buffer_default_max_events")] | ||
max_events: NonZeroUsize, | ||
/// The terms around how to express buffering limits, can be in size or bytes_size. | ||
size: MemoryBufferSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that Rob and I discussed this and he's planning to rework the config handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds support for byte-based limits for in-memory buffers by introducing a unified MemoryBufferSize
enum and dynamically selecting between element-count and byte-size queues.
- Replace standalone
max_events
with asize
object backed byMemoryBufferSize
across configs and APIs - Implement
QueueImpl
to choose betweenArrayQueue
(by events) andSegQueue
(by bytes) using a semaphore guard - Update all tests, examples, benchmarks, and documentation to use the new byte/event buffer sizing model
Reviewed Changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
website/cue/reference/components/base/sinks.cue | Update CUE schema to use new size object with max_bytes & max_size |
src/topology/test/backpressure.rs | Adapt backpressure tests to MemoryBufferSize |
src/test_util/mock/sources/basic.rs | Use MemoryBufferSize in mock sources |
src/source_sender/mod.rs | Initialize limited channel with MemoryBufferSize |
lib/vector-buffers/src/variants/in_memory.rs | Refactor MemoryBuffer to accept MemoryBufferSize |
lib/vector-buffers/src/topology/test_util.rs | Enhance Sample to account for heap-allocated bytes |
lib/vector-buffers/src/topology/channel/limited_queue.rs | Add QueueImpl , SizeTerms , & dynamic queue selection |
lib/vector-buffers/src/topology/builder.rs | Update topology builder to use MemoryBufferSize |
lib/vector-buffers/src/test/variant.rs | Update variant tests to use MemoryBufferSize |
lib/vector-buffers/src/lib.rs | Export MemoryBufferSize |
lib/vector-buffers/src/config.rs | Implement serde de/serialization for MemoryBufferSize |
lib/vector-buffers/examples/buffer_perf.rs | Adjust example to use new buffer size API |
lib/vector-buffers/benches/sized_records.rs | Add benchmarks for byte-based buffers via a BoundBy helper |
lib/vector-buffers/benches/common.rs | Extend Message to simulate heap allocation for size-based tests |
changelog.d/8679_add_support_max_bytes_memory_buffers.feature.md | Add changelog entry for the new max_bytes feature |
Comments suppressed due to low confidence (3)
lib/vector-buffers/src/config.rs:102
- [nitpick] The error message for invalid
max_bytes
is unclear and grammatically awkward; consider rephrasing to something like "max_bytes
must fit within the platform'susize
range" and include the actual bounds.
&"For memory buffers max_bytes expects an integer within the range of 268435488 and your architecture dependent usize",
lib/vector-buffers/src/config.rs:430
- Add a unit test in the config module to verify that a YAML or JSON config using
max_bytes
correctly deserializes intoMemoryBufferSize::MaxSize
underBufferType::Memory
.
for stage in self.stages() {
lib/vector-buffers/src/config.rs:207
- [nitpick] The variant name
MaxSize
is ambiguous; consider renaming it toMaxBytes
to clearly indicate it represents a byte-based limit.
MaxSize {
- Also removing its configurable_component tag as it is no longer officially part of the configuration
@@ -186,11 +244,18 @@ pub enum BufferType { | |||
/// This is more performant, but less durable. Data will be lost if Vector is restarted | |||
/// forcefully or crashes. | |||
#[configurable(title = "Events are buffered in memory.")] | |||
#[serde(rename = "memory")] | |||
#[serde(rename = "memory", serialize_with = "serialize_memory_config")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this serialize differently than what serde
natively generates for you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously max_events
was not an optional. Now it is, so if its not present in the schema natively the system will generate max_bytes:null
which causes the custom deserializer to fail because it expects an integer if this key exists.
I don't believe this is an issue other then tests because that is the only case where a schema is serialized by Vector, correct me if i'm wrong though.
// Two size options instead of a variant had been chosen to denote respective sizes so | ||
// that the generated documentation will output what the parser will expect, a flat | ||
// non-nested layout where exactly one of the two must be provided or else a default for | ||
// max_events will be chosen. | ||
/// The maximum number of events allowed in the buffer. | ||
#[serde(default = "memory_buffer_default_max_events")] | ||
max_events: NonZeroUsize, | ||
max_events: Option<NonZeroUsize>, | ||
|
||
/// The maximum size across all events allowed in the buffer. | ||
#[configurable(metadata(docs::type_unit = "bytes"))] | ||
max_size: Option<NonZeroUsize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought you could still accomplish the flattening with an enum:
#[serde(default, flatten)]
size: EventsOrSize,
#[serde(rename_all = "snake_case")]
enum EventsOrSize {
MaxEvents(NonZeroUsize),
MaxSize(NonZeroUsize),
}
Oh look, that's actually MemoryBufferSize
which just needs the serde annotations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave that a shot, it was not generating the documentation as intended though
@@ -273,7 +275,7 @@ mod tests { | |||
async fn single_stage_topology_block() { | |||
let mut builder = TopologyBuilder::<Sample>::default(); | |||
builder.stage( | |||
MemoryBuffer::new(NonZeroUsize::new(1).unwrap()), | |||
MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if some of the tests shouldn't also exercise the max_bytes
mode. Not sure…
- Replace if let chain with match expression - Replace map/sum with just calls to + - Replace function pointer in limited_queue.rs with enum + variant check
1720078
to
ffe54be
Compare
Note: |
Summary
This PR adds support for memory buffers to be bound in terms of bytes allocated. This is an opt-in feature which is defaulted to
false
, meaning that the current implementation and its defaults will still be selected when not explicitly supplying a value formax_bytes
.At the core of this change is a new interface that allowes the selection of a different lock-free queue. This queue is at the center of the implementation of memory buffers. Today that queue is
crossbeam_queue::ArrayQueue
which is a fixed-sized lock-free data structure. This queue being fixed size is the reason that #8679 could not easily be implemented. The new interface allows to drop in a non-fixed sized queue. Thecrossbeam_queue::SegQueue
was chosen, since it showed to be performant in initial testing and didn't require the inclusion of any new dependencies.The main resource (queue) is already guarded by a semaphore. This semaphore currently bounds the queue by number of elements but there's no reason for why it couldn't guard against bytes allocated, therefore much of that existing code remains the same - which is positive as it is already battle tested and seems relatively stable as is.
Finally a new unit test was added and new benchmarks included in
vector-buffers/benches
.Vector configuration
To any sink configuration try:
How did you test this PR?
Via the existing unit test and the developed benchmarks
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changelog
label to this PR.References
max_bytes
for memory buffers #8679Notes
@vectordotdev/vector
to reach out to us regarding this PR.pre-push
hook, please see this template.cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)git merge origin master
andgit push
.Cargo.lock
), pleaserun
cargo vdev build licenses
to regenerate the license inventory and commit the changes (if any). More details here.