Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ where
}

fn materialize(&self, row_index: usize) -> Vec<String> {
let mut result = vec![];
for i in 0..EngineList::len(self, row_index) {
let len = EngineList::len(self, row_index);
let mut result = Vec::with_capacity(len);
for i in 0..len {
result.push(self.get(row_index, i));
}
result
Expand All @@ -159,10 +160,10 @@ impl EngineMap for MapArray {
}

fn materialize(&self, row_index: usize) -> HashMap<String, String> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let keys = map_val.column(0).as_string::<i32>();
let values = map_val.column(1).as_string::<i32>();
let mut ret = HashMap::with_capacity(keys.len());
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), Some(value)) = (key, value) {
ret.insert(key.into(), value.into());
Expand Down Expand Up @@ -221,15 +222,16 @@ impl EngineData for ArrowEngineData {
// Collect the names of all leaf columns we want to extract, along with their parents, to
// guide our depth-first extraction. If the list contains any non-leaf, duplicate, or
// missing column references, the extracted column list will be too short (error out below).
let mut mask = HashSet::new();
let mask_capacity: usize = leaf_columns.iter().map(|c| c.len()).sum();
let mut mask = HashSet::with_capacity(mask_capacity);
for column in leaf_columns {
for i in 0..column.len() {
mask.insert(&column[..i + 1]);
}
}
debug!("Column mask for selected columns {leaf_columns:?} is {mask:#?}");

let mut getters = vec![];
let mut getters = Vec::with_capacity(leaf_columns.len());
Self::extract_columns(&mut vec![], &mut getters, leaf_types, &mask, &self.data)?;
if getters.len() != leaf_columns.len() {
return Err(Error::MissingColumn(format!(
Expand All @@ -248,16 +250,22 @@ impl EngineData for ArrowEngineData {
) -> DeltaResult<Box<dyn EngineData>> {
// Combine existing and new schema fields
let schema: ArrowSchema = schema.as_ref().try_into_arrow()?;
let mut combined_fields = self.data.schema().fields().to_vec();
combined_fields.extend_from_slice(schema.fields());
let existing_schema = self.data.schema();
let existing_fields = existing_schema.fields();
let new_fields = schema.fields();
let mut combined_fields = Vec::with_capacity(existing_fields.len() + new_fields.len());
combined_fields.extend_from_slice(existing_fields);
combined_fields.extend_from_slice(new_fields);
let combined_schema = Arc::new(ArrowSchema::new(combined_fields));

// Combine existing and new columns
let new_columns: Vec<ArrayRef> = columns
.into_iter()
.map(|array_data| array_data.to_arrow())
.try_collect()?;
let mut combined_columns = self.data.columns().to_vec();
let existing_columns = self.data.columns();
let mut combined_columns = Vec::with_capacity(existing_columns.len() + new_columns.len());
combined_columns.extend_from_slice(existing_columns);
combined_columns.extend(new_columns);

// Create a new ArrowEngineData with the combined schema and columns
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/arrow_expression/evaluate_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ fn evaluate_transform_expression(
let mut used_field_transforms = 0;

// Collect output columns directly to avoid creating intermediate Expr::Column instances.
let mut output_cols = Vec::new();
let mut output_cols = Vec::with_capacity(output_schema.num_fields());

// Helper lambda to get the next output field type
let mut output_schema_iter = output_schema.fields();
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ pub(crate) struct RowIndexBuilder {

impl RowIndexBuilder {
pub(crate) fn new(row_groups: &[RowGroupMetaData]) -> Self {
let mut row_group_row_index_ranges = vec![];
let mut row_group_row_index_ranges = Vec::with_capacity(row_groups.len());
let mut offset = 0;
for row_group in row_groups {
let num_rows = row_group.num_rows();
Expand Down Expand Up @@ -136,7 +136,7 @@ impl RowIndexBuilder {
pub(crate) fn build(self) -> DeltaResult<FlattenedRangeIterator<i64>> {
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => {
let mut seen_ordinals = HashSet::new();
let mut seen_ordinals = HashSet::with_capacity(ordinals.len());
ordinals
.iter()
.map(|&i| {
Expand Down
18 changes: 8 additions & 10 deletions kernel/src/engine/parquet_row_group_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,14 @@ impl<T> ParquetRowGroupSkipping for ArrowReaderBuilder<T> {
predicate: &Predicate,
row_indexes: Option<&mut RowIndexBuilder>,
) -> Self {
let ordinals: Vec<_> = self
.metadata()
.row_groups()
.iter()
.enumerate()
.filter_map(|(ordinal, row_group)| {
// If the group survives the filter, return Some(ordinal) so filter_map keeps it.
RowGroupFilter::apply(row_group, predicate).then_some(ordinal)
})
.collect();
let row_groups = self.metadata().row_groups();
// Collect ordinals of row groups that survive the filter
let mut ordinals = Vec::with_capacity(row_groups.len());
for (ordinal, row_group) in row_groups.iter().enumerate() {
if RowGroupFilter::apply(row_group, predicate) {
ordinals.push(ordinal);
}
}
debug!("with_row_group_filter({predicate:#?}) = {ordinals:?})");
if let Some(row_indexes) = row_indexes {
row_indexes.select_row_groups(&ordinals);
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/listed_log_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ fn list_log_files(
.map(|f| f.version)
.max();

let files: Vec<ParsedLogPath> = listed_files.into_iter().chain(filtered_log_tail).collect();
let mut files = Vec::with_capacity(listed_files.len() + filtered_log_tail.len());
files.extend(listed_files);
files.extend(filtered_log_tail);

Ok(ListLogFilesResult {
files,
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl StateInfo {
let partition_columns = table_configuration.metadata().partition_columns();
let column_mapping_mode = table_configuration.column_mapping_mode();
let mut read_fields = Vec::with_capacity(logical_schema.num_fields());
let mut transform_spec = Vec::new();
let mut transform_spec = Vec::with_capacity(logical_schema.num_fields());
let mut last_physical_field: Option<String> = None;

let metadata_info = validate_metadata_columns(&logical_schema, table_configuration)?;
Expand Down
11 changes: 7 additions & 4 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl Transaction {
// step 2 to fail early on duplicate transaction appIds
// TODO(zach): we currently do this in two passes - can we do it in one and still keep refs
// in the HashSet?
let mut app_ids = HashSet::new();
let mut app_ids = HashSet::with_capacity(self.set_transactions.len());
if let Some(dup) = self
.set_transactions
.iter()
Expand Down Expand Up @@ -649,7 +649,9 @@ impl Transaction {
}
/// Validate that user domains don't conflict with system domains or each other.
fn validate_user_domain_operations(&self) -> DeltaResult<()> {
let mut seen_domains = HashSet::new();
let mut seen_domains = HashSet::with_capacity(
self.domain_metadata_additions.len() + self.domain_removals.len(),
);

// Validate domain additions
for dm in &self.domain_metadata_additions {
Expand Down Expand Up @@ -1402,10 +1404,11 @@ impl<'a> DvMatchVisitor<'a> {
/// Creates a new DvMatchVisitor that will match file paths against the provided DV updates map.
#[cfg_attr(not(feature = "internal-api"), allow(dead_code))]
fn new(dv_updates: &'a HashMap<String, DeletionVectorDescriptor>) -> Self {
let capacity_hint = dv_updates.len();
Self {
dv_updates,
new_dv_entries: Vec::new(),
matched_file_indexes: Vec::new(),
new_dv_entries: Vec::with_capacity(capacity_hint),
matched_file_indexes: Vec::with_capacity(capacity_hint),
}
}
}
Expand Down
Loading