Add BinaryFormatSupport and Row Encoder to arrow-avro Writer#9171
Add BinaryFormatSupport and Row Encoder to arrow-avro Writer#9171alamb merged 9 commits intoapache:mainfrom
arrow-avro Writer#9171Conversation
2d57316 to
37efbe2
Compare
- Introduced `RecordEncoder::encode_rows` to buffer encoded rows as contiguous slices with per-row offsets using `BytesMut`. - Added `Encoder` for row-by-row Avro encoding, including zero-copy `Bytes` row access via `EncodedRows`. - Integrated `bytes` crate for efficient encoding operations. - Updated writer API to offer `build_encoder` for stream formats (e.g., SOE) alongside row-capacity configuration support. - Adjusted docs to highlight new encoder capabilities. - Comprehensive tests added to validate single/multi-column, nullable, prefix-based, and empty batch encoding scenarios.
37efbe2 to
ce72c5e
Compare
arrow-avro Writerarrow-avro Writer
|
@mbrobbel @alamb @scovich @nathaniel-d-ef Would any of you have bandwidth to review this PR? Much of the diff is comments and tests. I was hoping to get this out in the v58.0.0 release. This is also rather pivotal for the future direction of the |
14bc1ae to
5ded4c0
Compare
5ded4c0 to
8a5e951
Compare
nathaniel-d-ef
left a comment
There was a problem hiding this comment.
Conceptually this looks solid to me, though I'll defer to those with more advanced Rust knowledge to get in the weeds on performance. This should quite valuable to systems that just need the bytes - good work 👍
alamb
left a comment
There was a problem hiding this comment.
Looks good to me @jecsand838
I have some small additional test suggestions
And some API suggestions / questions, but nothing I think is necessary before merge
Let me know how you would like to proceed
arrow-avro/src/writer/mod.rs
Outdated
| // self.len() is defined as self.offsets.len().saturating_sub(1). | ||
| // The check `i >= self.len()` above ensures that `i < self.offsets.len() - 1`. | ||
| // Therefore, both `i` and `i + 1` are strictly within the bounds of `self.offsets`. | ||
| let (start_u64, end_u64) = unsafe { |
There was a problem hiding this comment.
die you see this use of unsafe make a difference in benchmarks?
There was a problem hiding this comment.
I did see a difference surprisingly.
In the screenshot below I run the benchmarks first with the unsafe code before changing the production code to be safe and re-running. There seemed to be a significant performance impact.
NOTE: For the safe test I used
let (start_u64, end_u64) = (self.offsets[i], self.offsets[i + 1]);.
I made sure to push up the benches I used for this in a new benches/encoder.rs file, which can be expanded on in future PRs.
arrow-avro/src/writer/mod.rs
Outdated
| /// # } | ||
| /// ``` | ||
| pub fn rows(&self) -> impl Iterator<Item = Result<Bytes, ArrowError>> + '_ { | ||
| (0..self.len()).map(|i| self.row(i)) |
There was a problem hiding this comment.
This is likely more efficient if you returned the sliced Bytes directly -- calling row will continually recheck len, for example
You could do something like this to get known good iffsets
self.offsets.iter().windows(2).map(...)There was a problem hiding this comment.
This was a great call out. I went ahead and implemented those changes and renamed the method from rows to iter which seemed more idiomatic.
arrow-avro/src/writer/mod.rs
Outdated
| pub fn to_vecs(&self) -> Result<Vec<Vec<u8>>, ArrowError> { | ||
| let mut out = Vec::with_capacity(self.len()); | ||
| for i in 0..self.len() { | ||
| out.push(self.row(i)?.to_vec()); | ||
| } | ||
| Ok(out) | ||
| } |
There was a problem hiding this comment.
This seems like an unnecessary API to me -- you could do it the same with
let vecs: Vec<_> = rows.iter().map(|v| v.to_vec()).collect()There was a problem hiding this comment.
100% great catch. I was overthinking this. Ended up removing to_vecs in my latest push and updated the documentation / examples to better showcase this.
arrow-avro/src/writer/mod.rs
Outdated
| } | ||
| } | ||
|
|
||
| /// A row-by-row streaming encoder for Avro **Single Object Encoding** (SOE) streams. |
There was a problem hiding this comment.
I wonder why a user couldn't just use Writer with a mut Vec as the the sink - you would get the same effect
Is the difference that you get the output offsets as well?
There was a problem hiding this comment.
Great question! At the byte level Writer<_, AvroSoeFormat> writing into a Vec<u8> does produce the same concatenated output stream.
The reason for Encoder however is that neither SOE nor the Confluent/Apicurio wire formats include a length field (SOE is just 0xC3 0x01 + 8-byte hashed fingerprint + body while Confluent is magic byte + 4-byte schema id + body). So once multiple rows are written into a single Vec, there’s no cheap or 100% reliable--especially for wire formats--way to split it back into per-row payloads without either decoding or getting hacky. Support for binary format was essentially blocked since those payloads aren't framed at all and therefore have no makeshift delimiter to scan for / split by.
Additionally, I hit performance bottlenecks when developing message-oriented sinks (Kafka/Pulsar/etc.) downstream of arrow-avro. These were incurred from having to use the Writer to encode 1-row batches and tracking Vec lengths, which is much less efficient due to repeated per-call setups and per-row allocations + copies.
The new Encoder solves this while enabling binary format by recording row-end offsets during encoding and returning zero-copy Bytes slices per row (via EncodedRows).
Add additional test coverage
9891885 to
09b688c
Compare
@alamb Thank you so much for the review and for the tests! I ended up merging your PR in and pushing up some changes to address the comments you left. I think your recommendations were solid and worth getting in now. Also I left some answers to your questions over the design. Let me know what you think when you get a chance. |
alamb
left a comment
There was a problem hiding this comment.
Looks good to me -- thanks @jecsand838
| let (start_u64, end_u64) = unsafe { | ||
| // The check `n >= self.len()` above ensures that `n < self.offsets.len() - 1`. | ||
| // Therefore, both `n` and `n + 1` are strictly within the bounds of `self.offsets`. | ||
| let (start, end) = unsafe { |
There was a problem hiding this comment.
using usize rather than u64 seems like a nice cleaup
There was a problem hiding this comment.
100%, that became apparent to me rather quickly lol.
arrow-avro/src/writer/mod.rs
Outdated
| pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ { | ||
| self.offsets.windows(2).map(|w| { | ||
| debug_assert!(w[0] <= w[1] && w[1] <= self.data.len()); | ||
| self.data.slice(w[0]..w[1]) |
There was a problem hiding this comment.
given you are using slice here I suspect the extra debug assert is not necessary as the slice also does the same check
There was a problem hiding this comment.
Ah yes, you are correct. I went ahead and removed the extra debug assert.
|
Sorry -- this now has a conflict (likely due to the new AvroError) |
c5f0fa1 to
3d24391
Compare
3d24391 to
4e12a1f
Compare
@alamb No worries! I just pushed up the changes to resolve the conflicts and use |
|
🚀 |
Which issue does this PR close?
Rationale for this change
arrow-avroalready supports writing Avro Object Container Files (OCF) and framed streaming encodings (e.g. Single-Object Encoding / registry wire formats). However, many systems exchange raw Avro binary datum payloads (i.e. only the Avro record body bytes) while supplying the schema out-of-band (configuration, RPC contract, topic metadata, etc.).Without first-class support for unframed datum output, users must either:
This PR adds the missing unframed write path and exposes a row-by-row encoding API to make it easy to embed Avro datums into other transport protocols.
What changes are included in this PR?
AvroBinaryFormat(unframed) as anAvroFormatimplementation to emit raw Avro record body bytes (no SOE prefix and no OCF header) and to explicitly reject container-level compression for this format.RecordEncoder::encode_rowsto encode aRecordBatchinto a single contiguous buffer while tracking per-row boundaries via appended offsets.Encoder+EncodedRowsAPI for row-by-row streaming use cases, providing zero-copy access to individual row slices (viaBytes).build_encoderfor stream formats (e.g. SOE) and added row-capacity configuration to better support incremental/streaming workflows.bytescrate as a dependency to support efficient buffering and slicing in the row encoder, and adjusted dev-dependencies to support the new tests/docs.Are these changes tested?
Yes.
This PR adds unit tests that cover:
Are there any user-facing changes?
Yes, these changes are additive (no breaking public API changes expected).
AvroBinaryFormat).RecordEncoder::encode_rows,Encoder,EncodedRows) to support zero-copy access to per-row encoded bytes.WriterBuilderfunctionality (build_encoder+ row-capacity configuration) to enable encoder construction without committing to a specificWritesink.