Skip to content

Commit 37efbe2

Browse files
committed
Add row-by-row encoding support and tests to arrow-avro
- Introduced `RecordEncoder::encode_rows` to buffer encoded rows as contiguous slices with per-row offsets using `BytesMut`. - Added `Encoder` for row-by-row Avro encoding, including zero-copy `Bytes` row access via `EncodedRows`. - Integrated `bytes` crate for efficient encoding operations. - Updated writer API to offer `build_encoder` for stream formats (e.g., SOE) alongside row-capacity configuration support. - Adjusted docs to highlight new encoder capabilities. - Comprehensive tests added to validate single/multi-column, nullable, prefix-based, and empty batch encoding scenarios.
1 parent f122d77 commit 37efbe2

File tree

4 files changed

+1126
-49
lines changed

4 files changed

+1126
-49
lines changed

arrow-avro/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ indexmap = "2.10"
6666
rand = "0.9"
6767
md5 = { version = "0.8", optional = true }
6868
sha2 = { version = "0.10", optional = true }
69+
bytes = "1.11"
6970

7071
[dev-dependencies]
7172
arrow-data = { workspace = true }
@@ -76,9 +77,8 @@ rand = { version = "0.9.1", default-features = false, features = [
7677
] }
7778
criterion = { workspace = true, default-features = false }
7879
tempfile = "3.3"
79-
arrow = { workspace = true }
80+
arrow = { workspace = true, features = ["prettyprint"] }
8081
futures = "0.3.31"
81-
bytes = "1.10.1"
8282
async-stream = "0.3.6"
8383
apache-avro = "0.21.0"
8484
num-bigint = "0.4"

arrow-avro/src/writer/encoder.rs

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use arrow_buffer::{ArrowNativeType, NullBuffer};
4343
use arrow_schema::{
4444
ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit, UnionMode,
4545
};
46+
use bytes::{BufMut, BytesMut};
4647
use std::io::Write;
4748
use std::sync::Arc;
4849
use uuid::Uuid;
@@ -826,6 +827,69 @@ impl RecordEncoder {
826827
}
827828
Ok(())
828829
}
830+
831+
/// Encode rows into a single contiguous `BytesMut` and append row-end offsets.
832+
///
833+
/// # Invariants
834+
///
835+
/// * `offsets` must be non-empty and seeded with `0` at index 0.
836+
/// * `offsets.last()` must equal `out.len()` on entry.
837+
/// * On success, exactly `batch.num_rows()` additional offsets are pushed, and
838+
/// `offsets.last()` equals the new `out.len()`.
839+
pub(crate) fn encode_rows(
840+
&self,
841+
batch: &RecordBatch,
842+
row_capacity: usize,
843+
out: &mut BytesMut,
844+
offsets: &mut Vec<u64>,
845+
) -> Result<(), ArrowError> {
846+
// Validate invariants once per call (cheap vs. per-row allocations).
847+
if offsets.is_empty() {
848+
return Err(ArrowError::InvalidArgumentError(
849+
"encode_rows requires offsets to be seeded with a 0 sentinel".to_string(),
850+
));
851+
}
852+
if offsets[0] != 0 {
853+
return Err(ArrowError::InvalidArgumentError(
854+
"encode_rows requires offsets[0] == 0".to_string(),
855+
));
856+
}
857+
let expected_last = out.len() as u64;
858+
if *offsets.last().unwrap() != expected_last {
859+
return Err(ArrowError::InvalidArgumentError(format!(
860+
"encode_rows requires offsets.last() == out.len() ({} != {})",
861+
offsets.last().unwrap(),
862+
expected_last
863+
)));
864+
}
865+
let mut column_encoders = self.prepare_for_batch(batch)?;
866+
let n = batch.num_rows();
867+
offsets.reserve(n);
868+
out.reserve(n.saturating_mul(row_capacity));
869+
let mut w = out.writer();
870+
match &self.prefix {
871+
Some(prefix) => {
872+
let prefix_bytes = prefix.as_slice();
873+
for row in 0..n {
874+
w.write_all(prefix_bytes)
875+
.map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
876+
for enc in column_encoders.iter_mut() {
877+
enc.encode(&mut w, row)?;
878+
}
879+
offsets.push((*w.get_ref()).len() as u64);
880+
}
881+
}
882+
None => {
883+
for row in 0..n {
884+
for enc in column_encoders.iter_mut() {
885+
enc.encode(&mut w, row)?;
886+
}
887+
offsets.push((*w.get_ref()).len() as u64);
888+
}
889+
}
890+
}
891+
Ok(())
892+
}
829893
}
830894

831895
fn find_struct_child_index(fields: &arrow_schema::Fields, name: &str) -> Option<usize> {
@@ -1977,6 +2041,12 @@ mod tests {
19772041
}
19782042
}
19792043

2044+
fn row_slice<'a>(buf: &'a [u8], offsets: &[u64], row: usize) -> &'a [u8] {
2045+
let start = offsets[row] as usize;
2046+
let end = offsets[row + 1] as usize;
2047+
&buf[start..end]
2048+
}
2049+
19802050
#[test]
19812051
fn binary_encoder() {
19822052
let values: Vec<&[u8]> = vec![b"", b"ab", b"\x00\xFF"];
@@ -3045,4 +3115,222 @@ mod tests {
30453115
other => panic!("expected NullableNoNulls, got {other:?}"),
30463116
}
30473117
}
3118+
3119+
#[test]
3120+
fn encode_rows_single_column_int32() {
3121+
let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3122+
let arr = Int32Array::from(vec![1, 2, 3]);
3123+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3124+
let encoder = RecordEncoder {
3125+
columns: vec![FieldBinding {
3126+
arrow_index: 0,
3127+
nullability: None,
3128+
plan: FieldPlan::Scalar,
3129+
}],
3130+
prefix: None,
3131+
};
3132+
let mut out = BytesMut::new();
3133+
let mut offsets: Vec<u64> = vec![0];
3134+
encoder
3135+
.encode_rows(&batch, 16, &mut out, &mut offsets)
3136+
.unwrap();
3137+
assert_eq!(offsets.len(), 4);
3138+
assert_eq!(*offsets.last().unwrap(), out.len() as u64);
3139+
assert_bytes_eq(row_slice(&out, &offsets, 0), &avro_long_bytes(1));
3140+
assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(2));
3141+
assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(3));
3142+
}
3143+
3144+
#[test]
3145+
fn encode_rows_multiple_columns() {
3146+
let schema = ArrowSchema::new(vec![
3147+
Field::new("a", DataType::Int32, false),
3148+
Field::new("b", DataType::Utf8, false),
3149+
]);
3150+
let int_arr = Int32Array::from(vec![10, 20]);
3151+
let str_arr = StringArray::from(vec!["hello", "world"]);
3152+
let batch = RecordBatch::try_new(
3153+
Arc::new(schema.clone()),
3154+
vec![Arc::new(int_arr), Arc::new(str_arr)],
3155+
)
3156+
.unwrap();
3157+
let encoder = RecordEncoder {
3158+
columns: vec![
3159+
FieldBinding {
3160+
arrow_index: 0,
3161+
nullability: None,
3162+
plan: FieldPlan::Scalar,
3163+
},
3164+
FieldBinding {
3165+
arrow_index: 1,
3166+
nullability: None,
3167+
plan: FieldPlan::Scalar,
3168+
},
3169+
],
3170+
prefix: None,
3171+
};
3172+
let mut out = BytesMut::new();
3173+
let mut offsets: Vec<u64> = vec![0];
3174+
encoder
3175+
.encode_rows(&batch, 32, &mut out, &mut offsets)
3176+
.unwrap();
3177+
assert_eq!(offsets.len(), 3);
3178+
assert_eq!(*offsets.last().unwrap(), out.len() as u64);
3179+
let mut expected_row0 = Vec::new();
3180+
expected_row0.extend(avro_long_bytes(10));
3181+
expected_row0.extend(avro_len_prefixed_bytes(b"hello"));
3182+
assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3183+
let mut expected_row1 = Vec::new();
3184+
expected_row1.extend(avro_long_bytes(20));
3185+
expected_row1.extend(avro_len_prefixed_bytes(b"world"));
3186+
assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3187+
}
3188+
3189+
#[test]
3190+
fn encode_rows_with_prefix() {
3191+
use crate::codec::AvroFieldBuilder;
3192+
use crate::schema::AvroSchema;
3193+
let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3194+
let arr = Int32Array::from(vec![42]);
3195+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3196+
let avro_schema = AvroSchema::try_from(&schema).unwrap();
3197+
let fingerprint = avro_schema
3198+
.fingerprint(crate::schema::FingerprintAlgorithm::Rabin)
3199+
.unwrap();
3200+
let avro_root = AvroFieldBuilder::new(&avro_schema.schema().unwrap())
3201+
.build()
3202+
.unwrap();
3203+
let encoder = RecordEncoderBuilder::new(&avro_root, &schema)
3204+
.with_fingerprint(Some(fingerprint))
3205+
.build()
3206+
.unwrap();
3207+
let mut out = BytesMut::new();
3208+
let mut offsets: Vec<u64> = vec![0];
3209+
encoder
3210+
.encode_rows(&batch, 32, &mut out, &mut offsets)
3211+
.unwrap();
3212+
assert_eq!(offsets.len(), 2);
3213+
let row0 = row_slice(&out, &offsets, 0);
3214+
assert!(row0.len() > 10, "Row should contain prefix + encoded value");
3215+
assert_eq!(row0[0], 0xC3);
3216+
assert_eq!(row0[1], 0x01);
3217+
}
3218+
3219+
#[test]
3220+
fn encode_rows_empty_batch() {
3221+
let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3222+
let arr = Int32Array::from(Vec::<i32>::new());
3223+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3224+
let encoder = RecordEncoder {
3225+
columns: vec![FieldBinding {
3226+
arrow_index: 0,
3227+
nullability: None,
3228+
plan: FieldPlan::Scalar,
3229+
}],
3230+
prefix: None,
3231+
};
3232+
let mut out = BytesMut::new();
3233+
let mut offsets: Vec<u64> = vec![0];
3234+
encoder
3235+
.encode_rows(&batch, 16, &mut out, &mut offsets)
3236+
.unwrap();
3237+
assert_eq!(offsets, vec![0u64]);
3238+
assert!(out.is_empty());
3239+
}
3240+
3241+
#[test]
3242+
fn encode_rows_matches_encode_output() {
3243+
let schema = ArrowSchema::new(vec![
3244+
Field::new("a", DataType::Int64, false),
3245+
Field::new("b", DataType::Float64, false),
3246+
]);
3247+
let int_arr = Int64Array::from(vec![100i64, 200, 300]);
3248+
let float_arr = Float64Array::from(vec![1.5, 2.5, 3.5]);
3249+
let batch = RecordBatch::try_new(
3250+
Arc::new(schema.clone()),
3251+
vec![Arc::new(int_arr), Arc::new(float_arr)],
3252+
)
3253+
.unwrap();
3254+
let encoder = RecordEncoder {
3255+
columns: vec![
3256+
FieldBinding {
3257+
arrow_index: 0,
3258+
nullability: None,
3259+
plan: FieldPlan::Scalar,
3260+
},
3261+
FieldBinding {
3262+
arrow_index: 1,
3263+
nullability: None,
3264+
plan: FieldPlan::Scalar,
3265+
},
3266+
],
3267+
prefix: None,
3268+
};
3269+
let mut stream_buf = Vec::new();
3270+
encoder.encode(&mut stream_buf, &batch).unwrap();
3271+
let mut out = BytesMut::new();
3272+
let mut offsets: Vec<u64> = vec![0];
3273+
encoder
3274+
.encode_rows(&batch, 32, &mut out, &mut offsets)
3275+
.unwrap();
3276+
assert_eq!(offsets.len(), 1 + batch.num_rows());
3277+
assert_bytes_eq(&out[..], &stream_buf);
3278+
}
3279+
3280+
#[test]
3281+
fn encode_rows_appends_to_existing_buffer() {
3282+
let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, false)]);
3283+
let arr = Int32Array::from(vec![5, 6]);
3284+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3285+
let encoder = RecordEncoder {
3286+
columns: vec![FieldBinding {
3287+
arrow_index: 0,
3288+
nullability: None,
3289+
plan: FieldPlan::Scalar,
3290+
}],
3291+
prefix: None,
3292+
};
3293+
let mut out = BytesMut::new();
3294+
out.extend_from_slice(&[0xAA, 0xBB]);
3295+
let mut offsets: Vec<u64> = vec![0, out.len() as u64];
3296+
encoder
3297+
.encode_rows(&batch, 16, &mut out, &mut offsets)
3298+
.unwrap();
3299+
assert_eq!(offsets.len(), 4);
3300+
assert_eq!(*offsets.last().unwrap(), out.len() as u64);
3301+
assert_bytes_eq(row_slice(&out, &offsets, 0), &[0xAA, 0xBB]);
3302+
assert_bytes_eq(row_slice(&out, &offsets, 1), &avro_long_bytes(5));
3303+
assert_bytes_eq(row_slice(&out, &offsets, 2), &avro_long_bytes(6));
3304+
}
3305+
3306+
#[test]
3307+
fn encode_rows_nullable_column() {
3308+
let schema = ArrowSchema::new(vec![Field::new("x", DataType::Int32, true)]);
3309+
let arr = Int32Array::from(vec![Some(1), None, Some(3)]);
3310+
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(arr)]).unwrap();
3311+
let encoder = RecordEncoder {
3312+
columns: vec![FieldBinding {
3313+
arrow_index: 0,
3314+
nullability: Some(Nullability::NullFirst),
3315+
plan: FieldPlan::Scalar,
3316+
}],
3317+
prefix: None,
3318+
};
3319+
let mut out = BytesMut::new();
3320+
let mut offsets: Vec<u64> = vec![0];
3321+
encoder
3322+
.encode_rows(&batch, 16, &mut out, &mut offsets)
3323+
.unwrap();
3324+
assert_eq!(offsets.len(), 4);
3325+
let mut expected_row0 = Vec::new();
3326+
expected_row0.extend(avro_long_bytes(1)); // union branch for value
3327+
expected_row0.extend(avro_long_bytes(1)); // value
3328+
assert_bytes_eq(row_slice(&out, &offsets, 0), &expected_row0);
3329+
let expected_row1 = avro_long_bytes(0); // union branch for null
3330+
assert_bytes_eq(row_slice(&out, &offsets, 1), &expected_row1);
3331+
let mut expected_row2 = Vec::new();
3332+
expected_row2.extend(avro_long_bytes(1)); // union branch for value
3333+
expected_row2.extend(avro_long_bytes(3)); // value
3334+
assert_bytes_eq(row_slice(&out, &offsets, 2), &expected_row2);
3335+
}
30483336
}

arrow-avro/src/writer/format.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,40 @@ impl AvroFormat for AvroSoeFormat {
135135
}
136136
}
137137

138+
/// Unframed Avro binary streaming format ("raw Avro record body bytes (no prefix, no OCF header)").
139+
///
140+
/// Each record written by the stream writer contains only the raw Avro
141+
/// record body bytes (i.e., the Avro binary encoding of the datum) with **no**
142+
/// per-record prefix and **no** Object Container File (OCF) header.
143+
///
144+
/// This format is useful when another transport provides framing (for example,
145+
/// length-delimited buffers) or when embedding Avro record payloads inside a
146+
/// larger envelope.
147+
#[derive(Debug, Default)]
148+
pub struct AvroBinaryFormat;
149+
150+
impl AvroFormat for AvroBinaryFormat {
151+
const NEEDS_PREFIX: bool = false;
152+
153+
fn start_stream<W: Write>(
154+
&mut self,
155+
_writer: &mut W,
156+
_schema: &Schema,
157+
compression: Option<CompressionCodec>,
158+
) -> Result<(), ArrowError> {
159+
if compression.is_some() {
160+
return Err(ArrowError::InvalidArgumentError(
161+
"Compression not supported for Avro binary streaming".to_string(),
162+
));
163+
}
164+
Ok(())
165+
}
166+
167+
fn sync_marker(&self) -> Option<&[u8; 16]> {
168+
None
169+
}
170+
}
171+
138172
#[inline]
139173
fn write_string<W: Write>(writer: &mut W, s: &str) -> Result<(), ArrowError> {
140174
write_bytes(writer, s.as_bytes())

0 commit comments

Comments
 (0)