Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ fn kernel_scan_data_next_impl(
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
if let Some(scan_data) = data.next().transpose()? {
let (data, sel_vec) = scan_data.filtered_data;
let bool_slice = KernelBoolSlice::from(sel_vec);
let transform_map = CTransforms { transforms };
let transform_map = CTransforms {
transforms: scan_data.transforms,
};
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
Ok(true)
} else {
Expand Down
7 changes: 4 additions & 3 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,12 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, transforms) = res?;
let scan_data = res?;
let (data, sel_vec) = scan_data.filtered_data;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
&sel_vec,
&scan_data.transforms,
(),
print_scan_file,
)?;
Expand Down
7 changes: 4 additions & 3 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,12 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, transforms) = res?;
let scan_data = res?;
let (data, sel_vec) = scan_data.filtered_data;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
&sel_vec,
&scan_data.transforms,
scan_file_tx,
send_scan_file,
)?;
Expand Down
6 changes: 6 additions & 0 deletions kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ use tracing::debug;

use std::collections::HashMap;

/// Engine data paired with a selection vector that indicates which rows to process.
///
/// `Box<dyn EngineData>` - The underlying data
/// `Vec<bool>` - Selection vector where `true` marks rows to include in results
pub type FilteredEngineData = (Box<dyn EngineData>, Vec<bool>);

/// a trait that an engine exposes to give access to a list
pub trait EngineList {
/// Return the length of the list at the specified row_index in the raw data
Expand Down
19 changes: 14 additions & 5 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ impl LogReplayScanner {
// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
let selection_vector = visitor.selection_vector;
let result = add_transform.evaluate(actions)?;
Ok((result, selection_vector, visitor.row_transform_exprs))
Ok(ScanData {
filtered_data: (result, selection_vector),
transforms: visitor.row_transform_exprs,
})
}
}

Expand Down Expand Up @@ -382,7 +385,12 @@ pub(crate) fn scan_action_iter(
is_log_batch,
)
})
.filter(|res| res.as_ref().map_or(true, |(_, sv, _)| sv.contains(&true)))
.filter(|res| {
res.as_ref().map_or(true, |scan_data| {
let (_, sel_vec) = &scan_data.filtered_data;
sel_vec.contains(&true)
})
})
}

#[cfg(test)]
Expand Down Expand Up @@ -464,8 +472,8 @@ mod tests {
None,
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert!(transforms.is_empty(), "Should have no transforms");
let scan_data = res.unwrap();
assert!(scan_data.transforms.is_empty(), "Should have no transforms");
}
}

Expand Down Expand Up @@ -510,7 +518,8 @@ mod tests {
}

for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
let scan_data = res.unwrap();
let transforms = scan_data.transforms;
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items,
// the first and 3rd being a `None`
assert_eq!(transforms.len(), 4, "Should have 4 transforms");
Expand Down
39 changes: 24 additions & 15 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::actions::deletion_vector::{
deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor,
};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME};
use crate::engine_data::FilteredEngineData;
use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar};
use crate::predicates::{DefaultPredicateEvaluator, EmptyColumnResolver};
use crate::scan::state::{DvInfo, Stats};
Expand Down Expand Up @@ -320,10 +321,15 @@ pub(crate) enum TransformExpr {
Partition(usize),
}

// TODO(nick): Make this a struct in a follow-on PR
// (data, deletion_vec, transforms)
pub type ScanData = (Box<dyn EngineData>, Vec<bool>, Vec<Option<ExpressionRef>>);
/// Result of a data scan operation containing filtered data and associated transformations.
pub struct ScanData {
/// Engine data with its selection vector indicating relevant rows
pub filtered_data: FilteredEngineData,

/// Row-level transformations where each expression must be applied to its corresponding row in
/// the `filtered_data`. If an expression is `None`, no transformation is needed for that row.
pub transforms: Vec<Option<ExpressionRef>>,
}
/// The result of building a scan over a table. This can be used to get the actual data from
/// scanning the table.
pub struct Scan {
Expand Down Expand Up @@ -493,15 +499,16 @@ impl Scan {
let table_root = self.snapshot.table_root().clone();
let physical_predicate = self.physical_predicate();

let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
let scan_data_iter = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data_iter
.map(|res| {
let (data, vec, transforms) = res?;
let scan_data = res?;
let (data, sel_vec) = scan_data.filtered_data;
let scan_files = vec![];
state::visit_scan_files(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a visit_scan_data_files or similar that just takes the ScanData? Then we don't have to do this decomposition all over the place

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about updating visit_scan_files to just take ScanData?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed a little offline: i'm kinda partial to a ScanData.visit(callback, context)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented @zachschuermann 's approach

data.as_ref(),
&vec,
&transforms,
&sel_vec,
&scan_data.transforms,
scan_files,
scan_data_callback,
)
Expand Down Expand Up @@ -792,12 +799,13 @@ pub(crate) mod test_utils {
);
let mut batch_count = 0;
for res in iter {
let (batch, sel, transforms) = res.unwrap();
assert_eq!(sel, expected_sel_vec);
let scan_data = res.unwrap();
let (batch, sel_vec) = scan_data.filtered_data;
assert_eq!(sel_vec, expected_sel_vec);
crate::scan::state::visit_scan_files(
batch.as_ref(),
&sel,
&transforms,
&sel_vec,
&scan_data.transforms,
context.clone(),
validate_callback,
)
Expand Down Expand Up @@ -1005,11 +1013,12 @@ mod tests {
}
let mut files = vec![];
for data in scan_data {
let (data, vec, transforms) = data?;
let scan_data = data?;
let (data, sel_vec) = scan_data.filtered_data;
files = state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
&sel_vec,
&scan_data.transforms,
files,
scan_data_callback,
)?;
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ pub type ScanCallback<T> = fn(
/// ```ignore
/// let mut context = [my context];
/// for res in scan_data { // scan data from scan.scan_data()
/// let (data, vector) = res?;
/// let scan_data = res?;
/// let (data, selection_vector) = scan_data.filtered_data;
/// context = delta_kernel::scan::state::visit_scan_files(
/// data.as_ref(),
/// selection_vector,
/// &scan_data.transforms,
/// context,
/// my_callback,
/// )?;
Expand Down
7 changes: 4 additions & 3 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,11 +361,12 @@ fn read_with_scan_data(
let scan_data = scan.scan_data(engine)?;
let mut scan_files = vec![];
for data in scan_data {
let (data, vec, transforms) = data?;
let scan_data = data?;
let (data, sel_vec) = scan_data.filtered_data;
scan_files = visit_scan_files(
data.as_ref(),
&vec,
&transforms,
&sel_vec,
&scan_data.transforms,
scan_files,
scan_data_callback,
)?;
Expand Down
Loading