-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(codecs): add varint length delimited framing for protobuf #23352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(codecs): add varint length delimited framing for protobuf #23352
Conversation
This commit adds support for varint length-delimited framing for protobuf sources and sinks in Vector. This addresses the use case where tools like ClickHouse expect protobuf messages with varint length prefixes instead of the standard 32-bit length prefixes. ## Changes - Add VarintLengthDelimitedEncoder for encoding varint length prefixes - Add VarintLengthDelimited option to FramingConfig enums - Update default protobuf framing to use varint instead of 32-bit length - Add comprehensive tests for varint framing (7 tests, all passing) - Update validation resources to handle new framing option ## Benefits - Better compatibility with tools like ClickHouse - Eliminates risk of protobuf messages being cut or skipped - Properly handles zero-length messages - Backward compatible with existing configurations ## Usage ```yaml # Sources sources: protobuf_source: type: socket decoding: codec: protobuf protobuf: desc_file: "path/to/protobuf.desc" message_type: "package.MessageType" framing: method: varint_length_delimited # Sinks sinks: protobuf_sink: type: socket encoding: codec: protobuf protobuf: desc_file: "path/to/protobuf.desc" message_type: "package.MessageType" framing: method: varint_length_delimited ``` ## Testing - All varint framing tests pass (7/7) - Vector compiles successfully - Configuration validation works - Default behavior updated correctly Closes: [Issue number]
|
@@ -0,0 +1,92 @@ | |||
# Example configuration demonstrating varint framing for protobuf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great but we need to find a better place for it in the docs.
return Ok(None); | ||
} | ||
|
||
let mut value: usize = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut value: usize = 0; | |
let mut value: u64 = 0; |
} | ||
|
||
let mut value: usize = 0; | ||
let mut shift: u32 = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut shift: u32 = 0; | |
let mut shift: u8 = 0; |
|
||
for byte in buf.iter() { | ||
bytes_read += 1; | ||
let byte_value = (*byte & 0x7F) as usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let byte_value = (*byte & 0x7F) as usize; | |
let byte_value = (*byte & 0x7F) as u64; |
let mut input = BytesMut::from(&[0xAC, 0x02, b'f', b'o', b'o'][..]); | ||
let mut decoder = VarintLengthDelimitedDecoder::default(); | ||
|
||
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), "foo"); | |
assert_eq!(decoder.decode(&mut input).unwrap().unwrap(), Bytes::from("foo")); |
// Check if the length is reasonable | ||
if length > self.max_frame_length { | ||
return Err(std::io::Error::new( | ||
std::io::ErrorKind::InvalidData, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can consider introducing a:
#[derive(Debug, Snafu)]
pub enum VarintFramingError {
#[snafu(display("Varint too large"))]
VarintOverflow,
#[snafu(display("Frame too large: {length} bytes (max: {max})"))]
FrameTooLarge { length: usize, max: usize },
#[snafu(display("Trailing data at EOF"))]
TrailingData,
#[snafu(display("I/O error: {}", source))]
Io { source: io::Error },
}
Also, do we need a custom can_continue
?
impl StreamDecodingError for VarintFramingError {
fn can_continue(&self) -> bool {
// ?
}
}
Thank you for this great PR! This CLA is mandatory before we can merge. |
1720078
to
ffe54be
Compare
This commit adds support for varint length-delimited framing for protobuf sources and sinks in Vector. This addresses the use case where tools like ClickHouse expect protobuf messages with varint length prefixes instead of the standard 32-bit length prefixes.
Changes
Benefits
Usage
Testing
Closes: [20156]
Summary
Vector configuration
How did you test this PR?
Change Type
Is this a breaking change?
Does this PR include user facing changes?
no-changelog
label to this PR.References
Notes
@vectordotdev/vector
to reach out to us regarding this PR.pre-push
hook, please see this template.cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)git merge origin master
andgit push
.Cargo.lock
), pleaserun
cargo vdev build licenses
to regenerate the license inventory and commit the changes (if any). More details here.