Skip to content

Commit da461c8

Browse files
authored
Revert "Improve coalesce and concat performance for views (#7614)" (#7623)
This reverts commit 7739a83. # Which issue does this PR close? # Rationale for this change I found this errors in DataFusion (see apache/datafusion#16249 (comment)), so let's revert it and find the error. # What changes are included in this PR? # Are there any user-facing changes?
1 parent 7739a83 commit da461c8

File tree

3 files changed

+55
-138
lines changed

3 files changed

+55
-138
lines changed

arrow-array/src/builder/generic_bytes_view_builder.rs

Lines changed: 20 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::any::Any;
1919
use std::marker::PhantomData;
2020
use std::sync::Arc;
2121

22-
use arrow_buffer::{Buffer, NullBufferBuilder, ScalarBuffer};
22+
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
2323
use arrow_data::ByteView;
2424
use arrow_schema::ArrowError;
2525
use hashbrown::hash_table::Entry;
@@ -28,7 +28,7 @@ use hashbrown::HashTable;
2828
use crate::builder::ArrayBuilder;
2929
use crate::types::bytes::ByteArrayNativeType;
3030
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
31-
use crate::{Array, ArrayRef, GenericByteViewArray};
31+
use crate::{ArrayRef, GenericByteViewArray};
3232

3333
const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
3434
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB
@@ -79,7 +79,7 @@ impl BlockSizeGrowthStrategy {
7979
/// using [`GenericByteViewBuilder::append_block`] and then views into this block appended
8080
/// using [`GenericByteViewBuilder::try_append_view`]
8181
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
82-
views_buffer: Vec<u128>,
82+
views_builder: BufferBuilder<u128>,
8383
null_buffer_builder: NullBufferBuilder,
8484
completed: Vec<Buffer>,
8585
in_progress: Vec<u8>,
@@ -99,7 +99,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
9999
/// Creates a new [`GenericByteViewBuilder`] with space for `capacity` string values.
100100
pub fn with_capacity(capacity: usize) -> Self {
101101
Self {
102-
views_buffer: Vec::with_capacity(capacity),
102+
views_builder: BufferBuilder::new(capacity),
103103
null_buffer_builder: NullBufferBuilder::new(capacity),
104104
completed: vec![],
105105
in_progress: vec![],
@@ -148,7 +148,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
148148
pub fn with_deduplicate_strings(self) -> Self {
149149
Self {
150150
string_tracker: Some((
151-
HashTable::with_capacity(self.views_buffer.capacity()),
151+
HashTable::with_capacity(self.views_builder.capacity()),
152152
Default::default(),
153153
)),
154154
..self
@@ -201,43 +201,10 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
201201
let b = b.get_unchecked(start..end);
202202

203203
let view = make_view(b, block, offset);
204-
self.views_buffer.push(view);
204+
self.views_builder.append(view);
205205
self.null_buffer_builder.append_non_null();
206206
}
207207

208-
/// Appends an array to the builder.
209-
/// This will flush any in-progress block and append the data buffers
210-
/// and add the (adapted) views.
211-
pub fn append_array(&mut self, array: &GenericByteViewArray<T>) {
212-
self.flush_in_progress();
213-
// keep original views if this array is the first to be added or if there are no data buffers (all inline views)
214-
let keep_views = self.completed.is_empty() || array.data_buffers().is_empty();
215-
216-
self.completed.extend(array.data_buffers().iter().cloned());
217-
218-
if keep_views {
219-
self.views_buffer.extend_from_slice(array.views());
220-
} else {
221-
let starting_buffer = self.completed.len() as u32;
222-
223-
self.views_buffer.extend(array.views().iter().map(|v| {
224-
let mut byte_view = ByteView::from(*v);
225-
if byte_view.length > 12 {
226-
// Small views (<=12 bytes) are inlined, so only need to update large views
227-
byte_view.buffer_index += starting_buffer;
228-
};
229-
230-
byte_view.as_u128()
231-
}));
232-
}
233-
234-
if let Some(null_buffer) = array.nulls() {
235-
self.null_buffer_builder.append_buffer(null_buffer);
236-
} else {
237-
self.null_buffer_builder.append_n_non_nulls(array.len());
238-
}
239-
}
240-
241208
/// Try to append a view of the given `block`, `offset` and `length`
242209
///
243210
/// See [`Self::append_block`]
@@ -288,7 +255,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
288255
/// Useful if we want to know what value has been inserted to the builder
289256
/// The index has to be smaller than `self.len()`, otherwise it will panic
290257
pub fn get_value(&self, index: usize) -> &[u8] {
291-
let view = self.views_buffer.as_slice().get(index).unwrap();
258+
let view = self.views_builder.as_slice().get(index).unwrap();
292259
let len = *view as u32;
293260
if len <= 12 {
294261
// # Safety
@@ -320,7 +287,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
320287
let mut view_buffer = [0; 16];
321288
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
322289
view_buffer[4..4 + v.len()].copy_from_slice(v);
323-
self.views_buffer.push(u128::from_le_bytes(view_buffer));
290+
self.views_builder.append(u128::from_le_bytes(view_buffer));
324291
self.null_buffer_builder.append_non_null();
325292
return;
326293
}
@@ -344,15 +311,16 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
344311
Entry::Occupied(occupied) => {
345312
// If the string already exists, we will directly use the view
346313
let idx = occupied.get();
347-
self.views_buffer.push(self.views_buffer[*idx]);
314+
self.views_builder
315+
.append(self.views_builder.as_slice()[*idx]);
348316
self.null_buffer_builder.append_non_null();
349317
self.string_tracker = Some((ht, hasher));
350318
return;
351319
}
352320
Entry::Vacant(vacant) => {
353321
// o.w. we insert the (string hash -> view index)
354322
// the idx is current length of views_builder, as we are inserting a new view
355-
vacant.insert(self.views_buffer.len());
323+
vacant.insert(self.views_builder.len());
356324
}
357325
}
358326
self.string_tracker = Some((ht, hasher));
@@ -373,7 +341,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
373341
buffer_index: self.completed.len() as u32,
374342
offset,
375343
};
376-
self.views_buffer.push(view.into());
344+
self.views_builder.append(view.into());
377345
self.null_buffer_builder.append_non_null();
378346
}
379347

@@ -390,20 +358,21 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
390358
#[inline]
391359
pub fn append_null(&mut self) {
392360
self.null_buffer_builder.append_null();
393-
self.views_buffer.push(0);
361+
self.views_builder.append(0);
394362
}
395363

396364
/// Builds the [`GenericByteViewArray`] and reset this builder
397365
pub fn finish(&mut self) -> GenericByteViewArray<T> {
398366
self.flush_in_progress();
399367
let completed = std::mem::take(&mut self.completed);
368+
let len = self.views_builder.len();
369+
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
400370
let nulls = self.null_buffer_builder.finish();
401371
if let Some((ref mut ht, _)) = self.string_tracker.as_mut() {
402372
ht.clear();
403373
}
404-
let views = std::mem::take(&mut self.views_buffer);
405374
// SAFETY: valid by construction
406-
unsafe { GenericByteViewArray::new_unchecked(views.into(), completed, nulls) }
375+
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
407376
}
408377

409378
/// Builds the [`GenericByteViewArray`] without resetting the builder
@@ -412,8 +381,8 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
412381
if !self.in_progress.is_empty() {
413382
completed.push(Buffer::from_slice_ref(&self.in_progress));
414383
}
415-
let len = self.views_buffer.len();
416-
let views = Buffer::from_slice_ref(self.views_buffer.as_slice());
384+
let len = self.views_builder.len();
385+
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
417386
let views = ScalarBuffer::new(views, 0, len);
418387
let nulls = self.null_buffer_builder.finish_cloned();
419388
// SAFETY: valid by construction
@@ -427,7 +396,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
427396

428397
/// Return the allocated size of this builder in bytes, useful for memory accounting.
429398
pub fn allocated_size(&self) -> usize {
430-
let views = self.views_buffer.capacity() * std::mem::size_of::<u128>();
399+
let views = self.views_builder.capacity() * std::mem::size_of::<u128>();
431400
let null = self.null_buffer_builder.allocated_size();
432401
let buffer_size = self.completed.iter().map(|b| b.capacity()).sum::<usize>();
433402
let in_progress = self.in_progress.capacity();
@@ -449,7 +418,7 @@ impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
449418
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450419
write!(f, "{}ViewBuilder", T::PREFIX)?;
451420
f.debug_struct("")
452-
.field("views_buffer", &self.views_buffer)
421+
.field("views_builder", &self.views_builder)
453422
.field("in_progress", &self.in_progress)
454423
.field("completed", &self.completed)
455424
.field("null_buffer_builder", &self.null_buffer_builder)

arrow-select/src/coalesce.rs

Lines changed: 34 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
//! [`filter`]: crate::filter::filter
2222
//! [`take`]: crate::take::take
2323
use crate::concat::concat_batches;
24-
use arrow_array::StringViewArray;
25-
use arrow_array::{cast::AsArray, Array, ArrayRef, RecordBatch};
26-
use arrow_data::ByteView;
24+
use arrow_array::{
25+
builder::StringViewBuilder, cast::AsArray, Array, ArrayRef, RecordBatch, RecordBatchOptions,
26+
};
2727
use arrow_schema::{ArrowError, SchemaRef};
2828
use std::collections::VecDeque;
2929
use std::sync::Arc;
@@ -164,7 +164,7 @@ impl BatchCoalescer {
164164
return Ok(());
165165
}
166166

167-
let mut batch = gc_string_view_batch(batch);
167+
let mut batch = gc_string_view_batch(&batch);
168168

169169
// If pushing this batch would exceed the target batch size,
170170
// finish the current batch and start a new one
@@ -242,19 +242,15 @@ impl BatchCoalescer {
242242
/// However, after a while (e.g., after `FilterExec` or `HashJoinExec`) the
243243
/// `StringViewArray` may only refer to a small portion of the buffer,
244244
/// significantly increasing memory usage.
245-
fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
246-
let (schema, columns, num_rows) = batch.into_parts();
247-
let new_columns: Vec<ArrayRef> = columns
248-
.into_iter()
245+
fn gc_string_view_batch(batch: &RecordBatch) -> RecordBatch {
246+
let new_columns: Vec<ArrayRef> = batch
247+
.columns()
248+
.iter()
249249
.map(|c| {
250250
// Try to re-create the `StringViewArray` to prevent holding the underlying buffer too long.
251251
let Some(s) = c.as_string_view_opt() else {
252-
return c;
252+
return Arc::clone(c);
253253
};
254-
if s.data_buffers().is_empty() {
255-
// If there are no data buffers, we can just return the array as is
256-
return c;
257-
}
258254
let ideal_buffer_size: usize = s
259255
.views()
260256
.iter()
@@ -268,73 +264,42 @@ fn gc_string_view_batch(batch: RecordBatch) -> RecordBatch {
268264
})
269265
.sum();
270266
let actual_buffer_size = s.get_buffer_memory_size();
271-
let buffers = s.data_buffers();
272267

273268
// Re-creating the array copies data and can be time consuming.
274269
// We only do it if the array is sparse
275270
if actual_buffer_size > (ideal_buffer_size * 2) {
276-
if ideal_buffer_size == 0 {
277-
// If the ideal buffer size is 0, all views are inlined
278-
// so just reuse the views
279-
return Arc::new(unsafe {
280-
StringViewArray::new_unchecked(
281-
s.views().clone(),
282-
vec![],
283-
s.nulls().cloned(),
284-
)
285-
});
286-
}
287271
// We set the block size to `ideal_buffer_size` so that the new StringViewArray only has one buffer, which accelerate later concat_batches.
288272
// See https://github.com/apache/arrow-rs/issues/6094 for more details.
289-
let mut buffer: Vec<u8> = Vec::with_capacity(ideal_buffer_size);
290-
291-
let views: Vec<u128> = s
292-
.views()
293-
.as_ref()
294-
.iter()
295-
.cloned()
296-
.map(|v| {
297-
let mut b: ByteView = ByteView::from(v);
298-
299-
if b.length > 12 {
300-
let offset = buffer.len() as u32;
301-
buffer.extend_from_slice(
302-
buffers[b.buffer_index as usize]
303-
.get(b.offset as usize..b.offset as usize + b.length as usize)
304-
.expect("Invalid buffer slice"),
305-
);
306-
b.offset = offset;
307-
b.buffer_index = 0; // Set buffer index to 0, as we only have one buffer
308-
}
309-
310-
b.into()
311-
})
312-
.collect();
313-
314-
let buffers = if buffer.is_empty() {
315-
vec![]
316-
} else {
317-
vec![buffer.into()]
318-
};
319-
320-
let gc_string = unsafe {
321-
StringViewArray::new_unchecked(views.into(), buffers, s.nulls().cloned())
322-
};
273+
let mut builder = StringViewBuilder::with_capacity(s.len());
274+
if ideal_buffer_size > 0 {
275+
builder = builder.with_fixed_block_size(ideal_buffer_size as u32);
276+
}
277+
278+
for v in s.iter() {
279+
builder.append_option(v);
280+
}
281+
282+
let gc_string = builder.finish();
283+
284+
debug_assert!(gc_string.data_buffers().len() <= 1); // buffer count can be 0 if the `ideal_buffer_size` is 0
323285

324286
Arc::new(gc_string)
325287
} else {
326-
c
288+
Arc::clone(c)
327289
}
328290
})
329291
.collect();
330-
unsafe { RecordBatch::new_unchecked(schema, new_columns, num_rows) }
292+
let mut options = RecordBatchOptions::new();
293+
options = options.with_row_count(Some(batch.num_rows()));
294+
RecordBatch::try_new_with_options(batch.schema(), new_columns, &options)
295+
.expect("Failed to re-create the gc'ed record batch")
331296
}
332297

333298
#[cfg(test)]
334299
mod tests {
335300
use super::*;
336-
use arrow_array::builder::{ArrayBuilder, StringViewBuilder};
337-
use arrow_array::{RecordBatchOptions, StringViewArray, UInt32Array};
301+
use arrow_array::builder::ArrayBuilder;
302+
use arrow_array::{StringViewArray, UInt32Array};
338303
use arrow_schema::{DataType, Field, Schema};
339304
use std::ops::Range;
340305

@@ -553,11 +518,9 @@ mod tests {
553518
fn test_gc_string_view_test_batch_empty() {
554519
let schema = Schema::empty();
555520
let batch = RecordBatch::new_empty(schema.into());
556-
let cols = batch.num_columns();
557-
let num_rows = batch.num_rows();
558-
let output_batch = gc_string_view_batch(batch);
559-
assert_eq!(cols, output_batch.num_columns());
560-
assert_eq!(num_rows, output_batch.num_rows());
521+
let output_batch = gc_string_view_batch(&batch);
522+
assert_eq!(batch.num_columns(), output_batch.num_columns());
523+
assert_eq!(batch.num_rows(), output_batch.num_rows());
561524
}
562525

563526
#[test]
@@ -605,11 +568,9 @@ mod tests {
605568
/// and ensures the number of rows are the same
606569
fn do_gc(array: StringViewArray) -> StringViewArray {
607570
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(array) as ArrayRef)]).unwrap();
608-
let rows = batch.num_rows();
609-
let schema = batch.schema();
610-
let gc_batch = gc_string_view_batch(batch);
611-
assert_eq!(rows, gc_batch.num_rows());
612-
assert_eq!(schema, gc_batch.schema());
571+
let gc_batch = gc_string_view_batch(&batch);
572+
assert_eq!(batch.num_rows(), gc_batch.num_rows());
573+
assert_eq!(batch.schema(), gc_batch.schema());
613574
gc_batch
614575
.column(0)
615576
.as_any()

arrow-select/src/concat.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@
3131
//! ```
3232
3333
use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
34-
use arrow_array::builder::{
35-
BooleanBuilder, GenericByteBuilder, GenericByteViewBuilder, PrimitiveBuilder,
36-
};
34+
use arrow_array::builder::{BooleanBuilder, GenericByteBuilder, PrimitiveBuilder};
3735
use arrow_array::cast::AsArray;
3836
use arrow_array::types::*;
3937
use arrow_array::*;
@@ -86,15 +84,6 @@ fn fixed_size_list_capacity(arrays: &[&dyn Array], data_type: &DataType) -> Capa
8684
}
8785
}
8886

89-
fn concat_byte_view<B: ByteViewType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
90-
let mut builder =
91-
GenericByteViewBuilder::<B>::with_capacity(arrays.iter().map(|a| a.len()).sum());
92-
for &array in arrays.iter() {
93-
builder.append_array(array.as_byte_view());
94-
}
95-
Ok(Arc::new(builder.finish()))
96-
}
97-
9887
fn concat_dictionaries<K: ArrowDictionaryKeyType>(
9988
arrays: &[&dyn Array],
10089
) -> Result<ArrayRef, ArrowError> {
@@ -436,8 +425,6 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
436425
_ => unreachable!("Unsupported run end index type: {r:?}"),
437426
}
438427
}
439-
DataType::Utf8View => concat_byte_view::<StringViewType>(arrays),
440-
DataType::BinaryView => concat_byte_view::<BinaryViewType>(arrays),
441428
_ => {
442429
let capacity = get_capacity(arrays, d);
443430
concat_fallback(arrays, capacity)

0 commit comments

Comments
 (0)