Skip to content

Commit b38bc5d

Browse files
authored
refactor!: update ScanMetadata to struct with new FilteredEngineData type (#768)
## What changes are proposed in this pull request? 1. Updated `ScanMetata` from typed tuple to struct. ScanMetadata is now a struct with fields: - filtered_data: A `FilteredEngineData` instance. - transforms: A vector of transformations to be applied to the data read from the files 2. Introduction of `FilteredEngineData` type: Couples `EngineData` with a selection vector indicating which rows to process. This type is returned from the`scan_metadata` API and the incoming `checkpoint` API 3. Updates `visit_scan_files` parameters to accept `ScanMetadata` to avoid de-structuring. 4. Corresponding FFI changes for `visit_scan_files` to accept `ScanMetadata` param All current tests pass.
1 parent 10bdee7 commit b38bc5d

File tree

9 files changed

+167
-132
lines changed

9 files changed

+167
-132
lines changed

ffi/examples/read-table/read_table.c

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,25 @@ void scan_row_callback(
8686
// For each chunk of scan metadata (which may contain multiple files to scan), kernel will call this
8787
// function (named do_visit_scan_metadata to avoid conflict with visit_scan_metadata exported by
8888
// kernel)
89-
void do_visit_scan_metadata(
90-
void* engine_context,
91-
ExclusiveEngineData* engine_data,
92-
KernelBoolSlice selection_vec,
93-
const CTransforms* transforms)
94-
{
89+
void do_visit_scan_metadata(void* engine_context, HandleSharedScanMetadata scan_metadata) {
9590
print_diag("\nScan iterator found some data to read\n Of this data, here is "
9691
"a selection vector\n");
97-
print_selection_vector(" ", &selection_vec);
92+
struct EngineContext* context = engine_context;
93+
94+
ExternResultKernelBoolSlice selection_vector_res =
95+
selection_vector_from_scan_metadata(scan_metadata, context->engine);
96+
if (selection_vector_res.tag != OkKernelBoolSlice) {
97+
printf("Could not get selection vector from kernel\n");
98+
exit(-1);
99+
}
100+
KernelBoolSlice selection_vector = selection_vector_res.ok;
101+
print_selection_vector(" ", &selection_vector);
102+
98103
// Ask kernel to iterate each individual file and call us back with extracted metadata
99104
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
100-
visit_scan_metadata(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
101-
free_bool_slice(selection_vec);
102-
free_engine_data(engine_data);
105+
visit_scan_metadata(scan_metadata, engine_context, scan_row_callback);
106+
free_bool_slice(selection_vector);
107+
free_scan_metadata(scan_metadata);
103108
}
104109

105110
// Called for each element of the partition StringSliceIterator. We just turn the slice into a

ffi/src/scan.rs

Lines changed: 52 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use std::collections::HashMap;
44
use std::sync::{Arc, Mutex};
55

6-
use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
6+
use delta_kernel::scan::state::{DvInfo, GlobalScanState};
77
use delta_kernel::scan::{Scan, ScanMetadata};
88
use delta_kernel::snapshot::Snapshot;
99
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
@@ -16,9 +16,9 @@ use crate::expressions::engine::{
1616
};
1717
use crate::expressions::SharedExpression;
1818
use crate::{
19-
kernel_string_slice, AllocateStringFn, ExclusiveEngineData, ExternEngine, ExternResult,
20-
IntoExternResult, KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid,
21-
SharedExternEngine, SharedSchema, SharedSnapshot, TryFromStringSlice,
19+
kernel_string_slice, AllocateStringFn, ExternEngine, ExternResult, IntoExternResult,
20+
KernelBoolSlice, KernelRowIndexArray, KernelStringSlice, NullableCvoid, SharedExternEngine,
21+
SharedSchema, SharedSnapshot, TryFromStringSlice,
2222
};
2323

2424
use super::handle::Handle;
@@ -29,6 +29,38 @@ use super::handle::Handle;
2929
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
3030
pub struct SharedScan;
3131

32+
#[handle_descriptor(target=ScanMetadata, mutable=false, sized=true)]
33+
pub struct SharedScanMetadata;
34+
35+
/// Drop a `SharedScanMetadata`.
36+
///
37+
/// # Safety
38+
///
39+
/// Caller is responsible for passing a valid scan data handle.
40+
#[no_mangle]
41+
pub unsafe extern "C" fn free_scan_metadata(scan_metadata: Handle<SharedScanMetadata>) {
42+
scan_metadata.drop_handle();
43+
}
44+
45+
/// Get a selection vector out of a [`SharedScanMetadata`] struct
46+
///
47+
/// # Safety
48+
/// Engine is responsible for providing valid pointers for each argument
49+
#[no_mangle]
50+
pub unsafe extern "C" fn selection_vector_from_scan_metadata(
51+
scan_metadata: Handle<SharedScanMetadata>,
52+
engine: Handle<SharedExternEngine>,
53+
) -> ExternResult<KernelBoolSlice> {
54+
let scan_metadata = unsafe { scan_metadata.as_ref() };
55+
selection_vector_from_scan_metadata_impl(scan_metadata).into_extern_result(&engine.as_ref())
56+
}
57+
58+
fn selection_vector_from_scan_metadata_impl(
59+
scan_metadata: &ScanMetadata,
60+
) -> DeltaResult<KernelBoolSlice> {
61+
Ok(scan_metadata.scan_files.selection_vector.clone().into())
62+
}
63+
3264
/// Drops a scan.
3365
///
3466
/// # Safety
@@ -175,9 +207,10 @@ fn scan_metadata_iter_init_impl(
175207
Ok(Arc::new(data).into())
176208
}
177209

178-
/// Call the provided `engine_visitor` on the next scan metadata item. The visitor will be provided
179-
/// with a selection vector and engine data. It is the responsibility of the _engine_ to free these
180-
/// when it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
210+
/// Call the provided `engine_visitor` on the next scan metadata item. The visitor will be provided with
211+
/// a [`SharedScanMetadata`], which contains the actual scan files and the associated selection vector. It is the
212+
/// responsibility of the _engine_ to free the associated resources after use by calling
213+
/// [`free_engine_data`] and [`free_bool_slice`] respectively.
181214
///
182215
/// # Safety
183216
///
@@ -192,9 +225,7 @@ pub unsafe extern "C" fn scan_metadata_next(
192225
engine_context: NullableCvoid,
193226
engine_visitor: extern "C" fn(
194227
engine_context: NullableCvoid,
195-
engine_data: Handle<ExclusiveEngineData>,
196-
selection_vector: KernelBoolSlice,
197-
transforms: &CTransforms,
228+
scan_metadata: Handle<SharedScanMetadata>,
198229
),
199230
) -> ExternResult<bool> {
200231
let data = unsafe { data.as_ref() };
@@ -206,19 +237,15 @@ fn scan_metadata_next_impl(
206237
engine_context: NullableCvoid,
207238
engine_visitor: extern "C" fn(
208239
engine_context: NullableCvoid,
209-
engine_data: Handle<ExclusiveEngineData>,
210-
selection_vector: KernelBoolSlice,
211-
transforms: &CTransforms,
240+
scan_metadata: Handle<SharedScanMetadata>,
212241
),
213242
) -> DeltaResult<bool> {
214243
let mut data = data
215244
.data
216245
.lock()
217246
.map_err(|_| Error::generic("poisoned mutex"))?;
218-
if let Some((data, sel_vec, transforms)) = data.next().transpose()? {
219-
let bool_slice = KernelBoolSlice::from(sel_vec);
220-
let transform_map = CTransforms { transforms };
221-
(engine_visitor)(engine_context, data.into(), bool_slice, &transform_map);
247+
if let Some(scan_metadata) = data.next().transpose()? {
248+
(engine_visitor)(engine_context, Arc::new(scan_metadata).into());
222249
Ok(true)
223250
} else {
224251
Ok(false)
@@ -421,31 +448,24 @@ struct ContextWrapper {
421448
}
422449

423450
/// Shim for ffi to call visit_scan_metadata. This will generally be called when iterating through scan
424-
/// data which provides the data handle and selection vector as each element in the iterator.
451+
/// data which provides the [`SharedScanMetadata`] as each element in the iterator.
425452
///
426453
/// # Safety
427-
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
454+
/// engine is responsible for passing a valid [`SharedScanMetadata`].
428455
#[no_mangle]
429456
pub unsafe extern "C" fn visit_scan_metadata(
430-
data: Handle<ExclusiveEngineData>,
431-
selection_vec: KernelBoolSlice,
432-
transforms: &CTransforms,
457+
scan_metadata: Handle<SharedScanMetadata>,
433458
engine_context: NullableCvoid,
434459
callback: CScanCallback,
435460
) {
436-
let selection_vec = unsafe { selection_vec.as_ref() };
437-
let data = unsafe { data.as_ref() };
461+
let scan_metadata = unsafe { scan_metadata.as_ref() };
438462
let context_wrapper = ContextWrapper {
439463
engine_context,
440464
callback,
441465
};
466+
442467
// TODO: return ExternResult to caller instead of panicking?
443-
visit_scan_files(
444-
data,
445-
selection_vec,
446-
&transforms.transforms,
447-
context_wrapper,
448-
rust_callback,
449-
)
450-
.unwrap();
468+
scan_metadata
469+
.visit_scan_files(context_wrapper, rust_callback)
470+
.unwrap();
451471
}

kernel/examples/inspect-table/src/main.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,10 @@ fn try_main() -> DeltaResult<()> {
209209
}
210210
Commands::ScanMetadata => {
211211
let scan = ScanBuilder::new(snapshot).build()?;
212-
let scan_metadata = scan.scan_metadata(&engine)?;
213-
for res in scan_metadata {
214-
let (data, vector, transforms) = res?;
215-
delta_kernel::scan::state::visit_scan_files(
216-
data.as_ref(),
217-
&vector,
218-
&transforms,
219-
(),
220-
print_scan_file,
221-
)?;
212+
let scan_metadata_iter = scan.scan_metadata(&engine)?;
213+
for res in scan_metadata_iter {
214+
let scan_metadata = res?;
215+
scan_metadata.visit_scan_files((), print_scan_file)?;
222216
}
223217
}
224218
Commands::Actions { oldest_first } => {

kernel/examples/read-table-multi-threaded/src/main.rs

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -210,14 +210,8 @@ fn try_main() -> DeltaResult<()> {
210210
drop(record_batch_tx);
211211

212212
for res in scan_metadata {
213-
let (data, vector, transforms) = res?;
214-
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
215-
data.as_ref(),
216-
&vector,
217-
&transforms,
218-
scan_file_tx,
219-
send_scan_file,
220-
)?;
213+
let scan_metadata = res?;
214+
scan_file_tx = scan_metadata.visit_scan_files(scan_file_tx, send_scan_file)?;
221215
}
222216

223217
// have sent all scan files, drop this so threads will exit when there's no more work

kernel/src/engine_data.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,19 @@ use tracing::debug;
77

88
use std::collections::HashMap;
99

10+
/// Engine data paired with a selection vector indicating which rows are logically selected.
11+
///
12+
/// A value of `true` in the selection vector means the corresponding row is selected (i.e., not deleted),
13+
/// while `false` means the row is logically deleted and should be ignored.
14+
///
15+
/// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior.
16+
pub struct FilteredEngineData {
17+
// The underlying engine data
18+
pub data: Box<dyn EngineData>,
19+
// The selection vector where `true` marks rows to include in results
20+
pub selection_vector: Vec<bool>,
21+
}
22+
1023
/// a trait that an engine exposes to give access to a list
1124
pub trait EngineList {
1225
/// Return the length of the list at the specified row_index in the raw data

kernel/src/scan/log_replay.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
364364

365365
// TODO: Teach expression eval to respect the selection vector we just computed so carefully!
366366
let result = self.add_transform.evaluate(actions_batch)?;
367-
Ok((
367+
Ok(ScanMetadata::new(
368368
result,
369369
visitor.selection_vector,
370370
visitor.row_transform_exprs,
@@ -470,8 +470,11 @@ mod tests {
470470
None,
471471
);
472472
for res in iter {
473-
let (_batch, _sel, transforms) = res.unwrap();
474-
assert!(transforms.is_empty(), "Should have no transforms");
473+
let scan_metadata = res.unwrap();
474+
assert!(
475+
scan_metadata.scan_file_transforms.is_empty(),
476+
"Should have no transforms"
477+
);
475478
}
476479
}
477480

@@ -516,7 +519,8 @@ mod tests {
516519
}
517520

518521
for res in iter {
519-
let (_batch, _sel, transforms) = res.unwrap();
522+
let scan_metadata = res.unwrap();
523+
let transforms = scan_metadata.scan_file_transforms;
520524
// in this case we have a metadata action first and protocol 3rd, so we expect 4 items,
521525
// the first and 3rd being a `None`
522526
assert_eq!(transforms.len(), 4, "Should have 4 transforms");

0 commit comments

Comments
 (0)