Skip to content

Commit

Permalink
Improve size calculations for RunEndEncoded data type and add test co…
Browse files Browse the repository at this point in the history
…verage for it
  • Loading branch information
itsjunetime committed Nov 7, 2024
1 parent c1890d4 commit 842ea66
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 18 deletions.
19 changes: 12 additions & 7 deletions arrow-array/src/array/run_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,19 @@ impl<R: RunEndIndexType> RunArray<R> {
}

impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
// The method assumes the caller already validated the data using `ArrayData::validate_data()`
fn from(data: ArrayData) -> Self {
match data.data_type() {
DataType::RunEndEncoded(_, _) => {}
_ => {
panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded");
}
}
Self::from(&data)
}
}

impl<R: RunEndIndexType> From<&ArrayData> for RunArray<R> {
// The method assumes the caller already validated the data using `ArrayData::validate_data()`
fn from(data: &ArrayData) -> Self {
let DataType::RunEndEncoded(_, _) = data.data_type() else {
panic!(
"Invalid data type for RunArray. The data type should be DataType::RunEndEncoded"
);
};

// Safety
// ArrayData is valid
Expand Down
2 changes: 1 addition & 1 deletion arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ mod tests {
// can be compared as such.
assert_eq!(input_batch.column(1), output_batch.column(1));

let run_array_1_unsliced = unslice_run_array(run_array_1_sliced.into_data()).unwrap();
let run_array_1_unsliced = unslice_run_array(&run_array_1_sliced.into_data()).unwrap();
assert_eq!(run_array_1_unsliced, output_batch.column(0).into_data());
}

Expand Down
57 changes: 47 additions & 10 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
}
}

pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, ArrowError> {
pub(crate) fn unslice_run_array(arr: &ArrayData) -> Result<ArrayData, ArrowError> {
match arr.data_type() {
DataType::RunEndEncoded(k, _) => match k.data_type() {
DataType::Int16 => {
Expand Down Expand Up @@ -1433,16 +1433,40 @@ fn get_encoded_arr_batch_size<AD: Borrow<ArrayData>>(
.map(|arr| {
let arr = arr.borrow();
arr.get_slice_memory_size_with_alignment(Some(write_options.alignment))
.map(|size| {
.and_then(|mut size| {
let didnt_count_nulls = arr.nulls().is_none();
let will_write_nulls = has_validity_bitmap(arr.data_type(), write_options);

if will_write_nulls && didnt_count_nulls {
let null_len = bit_util::ceil(arr.len(), 8);
size + null_len + pad_to_alignment(write_options.alignment, null_len)
} else {
size
size += null_len + pad_to_alignment(write_options.alignment, null_len)
}

// TODO: This is ugly. We remove the child_data size in RunEndEncoded because
// it was calculated as the size existing in memory but we care about the size
// when it's decoded and then encoded into a flatbuffer. Afaik, this is the
// only data type where the size in memory is not the same size as when encoded
// (since it has a different representation in memory), so it's not horrible,
// but it's definitely not ideal.
if let DataType::RunEndEncoded(_, _) = arr.data_type() {
size -= arr
.child_data()
.iter()
.map(|data| {
data.get_slice_memory_size_with_alignment(Some(
write_options.alignment,
))
})
.sum::<Result<usize, ArrowError>>()?;

size += unslice_run_array(arr)?
.child_data()
.iter()
.map(|data| get_encoded_arr_batch_size([data], write_options))
.sum::<Result<usize, ArrowError>>()?;
}

Ok(size)
})
})
.sum()
Expand Down Expand Up @@ -1837,7 +1861,7 @@ impl<'fbb> FlatBufferSizeTracker<'fbb> {
match array_data.data_type() {
DataType::Dictionary(_, _) => Ok(()),
// unslice the run encoded array.
DataType::RunEndEncoded(_, _) => write_arr(&unslice_run_array(array_data.clone())?),
DataType::RunEndEncoded(_, _) => write_arr(&unslice_run_array(array_data)?),
// recursively write out nested structures
_ => write_arr(array_data),
}
Expand Down Expand Up @@ -2945,10 +2969,9 @@ mod tests {

let arr_data = arr.to_data();

let write_options = IpcWriteOptions {
batch_compression_type: None,
..IpcWriteOptions::default()
};
let write_options = IpcWriteOptions::default()
.try_with_compression(None)
.unwrap();

let compute_size = get_encoded_arr_batch_size([&arr_data], &write_options).unwrap();
let num_rows = arr_data.len();
Expand Down Expand Up @@ -2992,5 +3015,19 @@ mod tests {

let list = FixedSizeListArray::new(list_field, 2, make_array(int_arr.to_data()), None);
encode_test(list);

let vals: Vec<Option<i32>> = vec![Some(1), None, Some(2), Some(3), Some(4), None, Some(5)];
let repeats: Vec<usize> = vec![3, 4, 1, 2];
let mut input_array: Vec<Option<i32>> = Vec::with_capacity(80);
for ix in 0_usize..32 {
let repeat: usize = repeats[ix % repeats.len()];
let val: Option<i32> = vals[ix % vals.len()];
input_array.resize(input_array.len() + repeat, val);
}
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array);
let run_array = builder.finish();
encode_test(run_array);
}
}

0 comments on commit 842ea66

Please sign in to comment.