Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,25 @@ void scan_row_callback(

// For each chunk of scan data (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel)
void do_visit_scan_data(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec,
const CTransforms* transforms)
{
void do_visit_scan_data(void* engine_context, HandleCScanData scan_data) {
print_diag("\nScan iterator found some data to read\n Of this data, here is "
"a selection vector\n");
print_selection_vector(" ", &selection_vec);
struct EngineContext* context = engine_context;

ExternResultKernelBoolSlice selection_vector_res =
selection_vector_from_scan_data(scan_data, context->engine);
if (selection_vector_res.tag != OkKernelBoolSlice) {
printf("Could not get selection vector from kernel\n");
exit(-1);
}
KernelBoolSlice selection_vector = selection_vector_res.ok;
print_selection_vector(" ", &selection_vector);

// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
visit_scan_data(scan_data, engine_context, scan_row_callback);
free_bool_slice(selection_vector);
free_scan_data(scan_data);
}

// Called for each element of the partition StringSliceIterator. We just turn the slice into a
Expand Down
85 changes: 48 additions & 37 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::state::{DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
Expand All @@ -16,9 +16,9 @@ use crate::expressions::engine::{
};
use crate::expressions::SharedExpression;
use crate::{
kernel_string_slice, AllocateStringFn, ExclusiveEngineData, ExternEngine, ExternResult,
IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid,
SharedExternEngine, SharedSchema, SharedSnapshot, TryFromStringSlice,
kernel_string_slice, AllocateStringFn, ExternEngine, ExternResult, IntoExternResult,
KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid, SharedExternEngine,
SharedSchema, SharedSnapshot, TryFromStringSlice,
};

use super::handle::Handle;
Expand All @@ -29,6 +29,36 @@ use super::handle::Handle;
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
pub struct SharedScan;

#[handle_descriptor(target=ScanData, mutable=false, sized=true)]
pub struct CScanData;

/// Drop an `CScanData`.
///
/// # Safety
///
/// Caller is responsible for passing a valid scan data handle.
#[no_mangle]
pub unsafe extern "C" fn free_scan_data(scan_data: Handle<CScanData>) {
scan_data.drop_handle();
}

/// Get a selection vector out of a [`CScanData`] struct
///
/// # Safety
/// Engine is responsible for providing valid pointers for each argument
#[no_mangle]
pub unsafe extern "C" fn selection_vector_from_scan_data(
scan_data: Handle<CScanData>,
engine: Handle<SharedExternEngine>,
) -> ExternResult<KernelBoolSlice> {
let scan_data = unsafe { scan_data.as_ref() };
selection_vector_from_scan_data_impl(scan_data).into_extern_result(&engine.as_ref())
}

fn selection_vector_from_scan_data_impl(scan_data: &ScanData) -> DeltaResult<KernelBoolSlice> {
Ok(scan_data.selection_vector().clone().into())
}

/// Drops a scan.
///
/// # Safety
Expand Down Expand Up @@ -176,8 +206,8 @@ fn kernel_scan_data_init_impl(
}

/// Call the provided `engine_visitor` on the next scan data item. The visitor will be provided with
/// a selection vector and engine data. It is the responsibility of the _engine_ to free these when
/// it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
/// a [`CScanData`]. It is the responsibility of the _engine_ to free these when it is finished
/// by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
///
/// # Safety
///
Expand All @@ -190,12 +220,7 @@ fn kernel_scan_data_init_impl(
pub unsafe extern "C" fn kernel_scan_data_next(
data: Handle<SharedScanDataIterator>,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
engine_visitor: extern "C" fn(engine_context: NullableCvoid, scan_data: Handle<CScanData>),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
kernel_scan_data_next_impl(data, engine_context, engine_visitor)
Expand All @@ -204,21 +229,14 @@ pub unsafe extern "C" fn kernel_scan_data_next(
fn kernel_scan_data_next_impl(
data: &KernelScanDataIterator,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
engine_data: Handle<ExclusiveEngineData>,
selection_vector: KernelBoolSlice,
transforms: &CTransforms,
),
engine_visitor: extern "C" fn(engine_context: NullableCvoid, scan_data: Handle<CScanData>),
) -> DeltaResult<bool> {
let mut data = data
.data
.lock()
.map_err(|_| Error::generic("poisoned mutex"))?;
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
let bool_slice = KernelBoolSlice::from(sel_vec);
let transform_map = CTransforms { transforms };
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
if let Some(scan_data) = data.next().transpose()? {
(engine_visitor)(engine_context, Arc::new(scan_data).into());
Ok(true)
} else {
Ok(false)
Expand Down Expand Up @@ -421,31 +439,24 @@ struct ContextWrapper {
}

/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan
/// data which provides the data handle and selection vector as each element in the iterator.
/// data which provides the [`CScanData`] as each element in the iterator.
///
/// # Safety
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
/// engine is responsible for passing a valid [`CScanData`].
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
scan_data: Handle<CScanData>,
engine_context: NullableCvoid,
callback: CScanCallback,
) {
let selection_vec = unsafe { selection_vec.as_ref() };
let data = unsafe { data.as_ref() };
let scan_data = unsafe { scan_data.as_ref() };
let context_wrapper = ContextWrapper {
engine_context,
callback,
};

// TODO: return ExternResult to caller instead of panicking?
visit_scan_files(
data,
selection_vec,
&transforms.transforms,
context_wrapper,
rust_callback,
)
.unwrap();
scan_data
.visit_scan_files(context_wrapper, rust_callback)
.unwrap();
}
10 changes: 2 additions & 8 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,8 @@ fn try_main() -> DeltaResult<()> {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
(),
print_scan_file,
)?;
let scan_data = res?;
scan_data.visit_scan_files((), print_scan_file)?;
}
}
Commands::Actions { oldest_first } => {
Expand Down
10 changes: 2 additions & 8 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,8 @@ fn try_main() -> DeltaResult<()> {
drop(record_batch_tx);

for res in scan_data {
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
&vector,
&transforms,
scan_file_tx,
send_scan_file,
)?;
let scan_data = res?;
scan_file_tx = scan_data.visit_scan_files(scan_file_tx, send_scan_file)?;
}

// have sent all scan files, drop this so threads will exit when there's no more work
Expand Down
13 changes: 13 additions & 0 deletions kernel/src/engine_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ use tracing::debug;

use std::collections::HashMap;

/// Engine data paired with a selection vector indicating which rows are logically selected.
///
/// A value of `true` in the selection vector means the corresponding row is selected (i.e., not deleted),
/// while `false` means the row is logically deleted and should be ignored.
///
/// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior.
pub struct FilteredEngineData {
// The underlying engine data
pub data: Box<dyn EngineData>,
// The selection vector where `true` marks rows to include in results
pub selection_vector: Vec<bool>,
}

/// a trait that an engine exposes to give access to a list
pub trait EngineList {
/// Return the length of the list at the specified row_index in the raw data
Expand Down
12 changes: 8 additions & 4 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor {

// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
let result = self.add_transform.evaluate(actions_batch)?;
Ok((
Ok(ScanData::new(
result,
visitor.selection_vector,
visitor.row_transform_exprs,
Expand Down Expand Up @@ -469,8 +469,11 @@ mod tests {
None,
);
for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
assert!(transforms.is_empty(), "Should have no transforms");
let scan_data = res.unwrap();
assert!(
scan_data.scan_file_transforms.is_empty(),
"Should have no transforms"
);
}
}

Expand Down Expand Up @@ -515,7 +518,8 @@ mod tests {
}

for res in iter {
let (_batch, _sel, transforms) = res.unwrap();
let scan_data = res.unwrap();
let transforms = scan_data.scan_file_transforms;
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items,
// the first and 3rd being a `None`
assert_eq!(transforms.len(), 4, "Should have 4 transforms");
Expand Down
75 changes: 43 additions & 32 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::actions::deletion_vector::{
deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor,
};
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME, SIDECAR_NAME};
use crate::engine_data::FilteredEngineData;
use crate::expressions::{ColumnName, Expression, ExpressionRef, ExpressionTransform, Scalar};
use crate::log_replay::HasSelectionVector;
use crate::predicates::{DefaultPredicateEvaluator, EmptyColumnResolver};
Expand Down Expand Up @@ -321,13 +322,40 @@ pub(crate) enum TransformExpr {
Partition(usize),
}

// TODO(nick): Make this a struct in a follow-on PR
// (data, deletion_vec, transforms)
pub type ScanData = (Box<dyn EngineData>, Vec<bool>, Vec<Option<ExpressionRef>>);
/// Result of a data scan operation containing filtered data and associated transformations.
pub struct ScanData {
/// Filtered engine data with one row per file to scan (and only selected rows should be scanned)
pub scan_files: FilteredEngineData,

/// Row-level transformations where each expression must be applied to its corresponding row in
/// the `scan_files`. If an expression is `None`, no transformation is needed for that row.
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
}

impl ScanData {
fn new(
data: Box<dyn EngineData>,
selection_vector: Vec<bool>,
scan_file_transforms: Vec<Option<ExpressionRef>>,
) -> Self {
Self {
scan_files: FilteredEngineData {
data,
selection_vector,
},
scan_file_transforms,
}
}

// Get a reference to the selection vector
pub fn selection_vector(&self) -> &Vec<bool> {
&self.scan_files.selection_vector
}
}

impl HasSelectionVector for ScanData {
fn has_selected_rows(&self) -> bool {
self.1.contains(&true)
self.scan_files.selection_vector.contains(&true)
}
}

Expand Down Expand Up @@ -500,18 +528,12 @@ impl Scan {
let table_root = self.snapshot.table_root().clone();
let physical_predicate = self.physical_predicate();

let scan_data = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data
let scan_data_iter = self.scan_data(engine.as_ref())?;
let scan_files_iter = scan_data_iter
.map(|res| {
let (data, vec, transforms) = res?;
let scan_data = res?;
let scan_files = vec![];
state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
scan_files,
scan_data_callback,
)
scan_data.visit_scan_files(scan_files, scan_data_callback)
})
// Iterator<DeltaResult<Vec<ScanFile>>> to Iterator<DeltaResult<ScanFile>>
.flatten_ok();
Expand Down Expand Up @@ -790,16 +812,11 @@ pub(crate) mod test_utils {
);
let mut batch_count = 0;
for res in iter {
let (batch, sel, transforms) = res.unwrap();
assert_eq!(sel, expected_sel_vec);
crate::scan::state::visit_scan_files(
batch.as_ref(),
&sel,
&transforms,
context.clone(),
validate_callback,
)
.unwrap();
let scan_data = res.unwrap();
assert_eq!(scan_data.selection_vector(), &expected_sel_vec);
scan_data
.visit_scan_files(context.clone(), validate_callback)
.unwrap();
batch_count += 1;
}
assert_eq!(batch_count, 1);
Expand Down Expand Up @@ -1003,14 +1020,8 @@ mod tests {
}
let mut files = vec![];
for data in scan_data {
let (data, vec, transforms) = data?;
files = state::visit_scan_files(
data.as_ref(),
&vec,
&transforms,
files,
scan_data_callback,
)?;
let scan_data = data?;
files = scan_data.visit_scan_files(files, scan_data_callback)?;
}
Ok(files)
}
Expand Down
Loading
Loading