Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ fn assert_columns_match(actual: &[Arc<dyn Array>], expected: &[Arc<dyn Array>])
}
}

pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo) -> TestResult<()> {
pub async fn assert_scan_metadata(
engine: Arc<dyn Engine>,
test_case: &TestCaseInfo,
) -> TestResult<()> {
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine.as_ref(), None)?;
Expand Down
2 changes: 1 addition & 1 deletion acceptance/tests/dat_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
);

case.assert_metadata(engine.clone()).await.unwrap();
acceptance::data::assert_scan_data(engine.clone(), &case)
acceptance::data::assert_scan_metadata(engine.clone(), &case)
.await
.unwrap();
});
Expand Down
29 changes: 16 additions & 13 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ void scan_row_callback(
context->partition_values = NULL;
}

// For each chunk of scan data (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel)
void do_visit_scan_data(
// For each chunk of scan metadata (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_metadata to avoid conflict with visit_scan_metadata exported by
// kernel)
void do_visit_scan_metadata(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec,
Expand All @@ -96,7 +97,7 @@ void do_visit_scan_data(
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
visit_scan_metadata(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down Expand Up @@ -291,26 +292,28 @@ int main(int argc, char* argv[])
#endif
};

ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_data_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator) {
print_error("Failed to construct scan data iterator.", (Error*)data_iter_res.err);
ExternResultHandleSharedScanMetadataIterator data_iter_res =
scan_metadata_iter_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanMetadataIterator) {
print_error("Failed to construct scan metadata iterator.", (Error*)data_iter_res.err);
free_error((Error*)data_iter_res.err);
return -1;
}

SharedScanDataIterator* data_iter = data_iter_res.ok;
SharedScanMetadataIterator* data_iter = data_iter_res.ok;

print_diag("\nIterating scan data\n");
print_diag("\nIterating scan metadata\n");

// iterate scan files
for (;;) {
ExternResultbool ok_res = kernel_scan_data_next(data_iter, &context, do_visit_scan_data);
ExternResultbool ok_res =
scan_metadata_next(data_iter, &context, do_visit_scan_metadata);
if (ok_res.tag != Okbool) {
print_error("Failed to iterate scan data.", (Error*)ok_res.err);
print_error("Failed to iterate scan metadata.", (Error*)ok_res.err);
free_error((Error*)ok_res.err);
return -1;
} else if (!ok_res.ok) {
print_diag("Scan data iterator done\n");
print_diag("Scan metadata iterator done\n");
break;
}
}
Expand All @@ -323,7 +326,7 @@ int main(int argc, char* argv[])
context.arrow_context = NULL;
#endif

free_kernel_scan_data(data_iter);
free_scan_metadata_iter(data_iter);
free_scan(scan);
free_schema(logical_schema);
free_schema(read_schema);
Expand Down
8 changes: 4 additions & 4 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,11 +685,11 @@ pub struct StringSliceIterator;

/// # Safety
///
/// The iterator must be valid (returned by [`kernel_scan_data_init`]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [`scan_metadata_iter_init`]) and not yet freed by
/// [`free_scan_metadata_iter`]. The visitor function pointer must be non-null.
///
/// [`kernel_scan_data_init`]: crate::scan::kernel_scan_data_init
/// [`free_kernel_scan_data`]: crate::scan::free_kernel_scan_data
/// [`scan_metadata_iter_init`]: crate::scan::scan_metadata_iter_init
/// [`free_scan_metadata_iter`]: crate::scan::free_scan_metadata_iter
#[no_mangle]
pub unsafe extern "C" fn string_slice_next(
data: Handle<StringSliceIterator>,
Expand Down
70 changes: 35 additions & 35 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::scan::{Scan, ScanMetadata};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
Expand All @@ -24,7 +24,7 @@ use crate::{
use super::handle::Handle;

// TODO: Why do we even need to expose a scan, when the only thing an engine can do with it is
// handit back to the kernel by calling `kernel_scan_data_init`? There isn't even an FFI method to
// handit back to the kernel by calling `scan_metadata_iter_init`? There isn't even an FFI method to
// drop it!
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
pub struct SharedScan;
Expand Down Expand Up @@ -125,70 +125,70 @@ pub unsafe extern "C" fn free_global_scan_state(state: Handle<SharedGlobalScanSt
// means kernel made the decision of how to achieve thread safety. This may not be desirable if the
// engine is single-threaded, or has its own mutual exclusion mechanisms. Deadlock is even a
// conceivable risk, if this interacts poorly with engine's mutual exclusion mechanism.
pub struct KernelScanDataIterator {
pub struct ScanMetadataIterator {
// Mutex -> Allow the iterator to be accessed safely by multiple threads.
// Box -> Wrap its unsized content this struct is fixed-size with thin pointers.
// Item = DeltaResult<ScanData>
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanData>> + Send>>,
// Item = DeltaResult<ScanMetadata>
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>> + Send>>,

// Also keep a reference to the external engine for its error allocator. The default Parquet and
// Json handlers don't hold any reference to the tokio reactor they rely on, so the iterator
// terminates early if the last engine goes out of scope.
engine: Arc<dyn ExternEngine>,
}

#[handle_descriptor(target=KernelScanDataIterator, mutable=false, sized=true)]
pub struct SharedScanDataIterator;
#[handle_descriptor(target=ScanMetadataIterator, mutable=false, sized=true)]
pub struct SharedScanMetadataIterator;

impl Drop for KernelScanDataIterator {
impl Drop for ScanMetadataIterator {
fn drop(&mut self) {
debug!("dropping KernelScanDataIterator");
debug!("dropping ScanMetadataIterator");
}
}

/// Get an iterator over the data needed to perform a scan. This will return a
/// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual
/// data in the iterator.
/// [`ScanMetadataIterator`] which can be passed to [`scan_metadata_next`] to get the
/// actual data in the iterator.
///
/// # Safety
///
/// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedScan`]
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_init(
pub unsafe extern "C" fn scan_metadata_iter_init(
engine: Handle<SharedExternEngine>,
scan: Handle<SharedScan>,
) -> ExternResult<Handle<SharedScanDataIterator>> {
) -> ExternResult<Handle<SharedScanMetadataIterator>> {
let engine = unsafe { engine.clone_as_arc() };
let scan = unsafe { scan.as_ref() };
kernel_scan_data_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
scan_metadata_iter_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
}

fn kernel_scan_data_init_impl(
fn scan_metadata_iter_init_impl(
engine: &Arc<dyn ExternEngine>,
scan: &Scan,
) -> DeltaResult<Handle<SharedScanDataIterator>> {
let scan_data = scan.scan_data(engine.engine().as_ref())?;
let data = KernelScanDataIterator {
data: Mutex::new(Box::new(scan_data)),
) -> DeltaResult<Handle<SharedScanMetadataIterator>> {
let scan_metadata = scan.scan_metadata(engine.engine().as_ref())?;
let data = ScanMetadataIterator {
data: Mutex::new(Box::new(scan_metadata)),
engine: engine.clone(),
};
Ok(Arc::new(data).into())
}

/// Call the provided `engine_visitor` on the next scan data item. The visitor will be provided with
/// a selection vector and engine data. It is the responsibility of the _engine_ to free these when
/// it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
/// Call the provided `engine_visitor` on the next scan metadata item. The visitor will be provided
/// with a selection vector and engine data. It is the responsibility of the _engine_ to free these
/// when it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
///
/// # Safety
///
/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [scan_metadata_iter_init]) and not yet freed by
/// [`free_scan_metadata_iter`]. The visitor function pointer must be non-null.
///
/// [`free_bool_slice`]: crate::free_bool_slice
/// [`free_engine_data`]: crate::free_engine_data
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_next(
data: Handle<SharedScanDataIterator>,
pub unsafe extern "C" fn scan_metadata_next(
data: Handle<SharedScanMetadataIterator>,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
Expand All @@ -198,11 +198,11 @@ pub unsafe extern "C" fn kernel_scan_data_next(
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
kernel_scan_data_next_impl(data, engine_context, engine_visitor)
scan_metadata_next_impl(data, engine_context, engine_visitor)
.into_extern_result(&data.engine.as_ref())
}
fn kernel_scan_data_next_impl(
data: &KernelScanDataIterator,
fn scan_metadata_next_impl(
data: &ScanMetadataIterator,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
Expand All @@ -228,11 +228,11 @@ fn kernel_scan_data_next_impl(
/// # Safety
///
/// Caller is responsible for (at most once) passing a valid pointer returned by a call to
/// [`kernel_scan_data_init`].
/// [`scan_metadata_iter_init`].
// we should probably be consistent with drop vs. free on engine side (probably the latter is more
// intuitive to non-rust code)
#[no_mangle]
pub unsafe extern "C" fn free_kernel_scan_data(data: Handle<SharedScanDataIterator>) {
pub unsafe extern "C" fn free_scan_metadata_iter(data: Handle<SharedScanMetadataIterator>) {
data.drop_handle();
}

Expand Down Expand Up @@ -297,14 +297,14 @@ pub unsafe extern "C" fn get_from_string_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}

/// Transformation expressions that need to be applied to each row `i` in ScanData. You can use
/// Transformation expressions that need to be applied to each row `i` in ScanMetadata. You can use
/// [`get_transform_for_row`] to get the transform for a particular row. If that returns an
/// associated expression, it _must_ be applied to the data read from the file specified by the
/// row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If
/// `get_transform_for_row` returns `NULL` no expression need be applied and the data read from disk
/// is already in the correct logical state.
///
/// NB: If you are using `visit_scan_data` you don't need to worry about dealing with probing
/// NB: If you are using `visit_scan_metadata` you don't need to worry about dealing with probing
/// `CTransforms`. The callback will be invoked with the correct transform for you.
pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
Expand Down Expand Up @@ -420,13 +420,13 @@ struct ContextWrapper {
callback: CScanCallback,
}

/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan
/// Shim for ffi to call visit_scan_metadata. This will generally be called when iterating through scan
/// data which provides the data handle and selection vector as each element in the iterator.
///
/// # Safety
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
pub unsafe extern "C" fn visit_scan_metadata(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
Expand Down
8 changes: 4 additions & 4 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum Commands {
/// Show the table's schema
Schema,
/// Show the meta-data that would be used to scan the table
ScanData,
ScanMetadata,
/// Show each action from the log-segments
Actions {
/// Show the log in reverse order (default is log replay order -- newest first)
Expand Down Expand Up @@ -207,10 +207,10 @@ fn try_main() -> DeltaResult<()> {
Commands::Schema => {
println!("{:#?}", snapshot.schema());
}
Commands::ScanData => {
Commands::ScanMetadata => {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let scan_metadata = scan.scan_metadata(&engine)?;
for res in scan_metadata {
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Read Table Multi-Threaded

# About
This example shows a program that reads a table using multiple threads. This shows the use of the
`scan_data`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
`scan_metadata`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
to either multiple threads, or workers (in the case of a distributed engine).

You can run this from the same directory as this `README.md` by running `cargo run -- [args]`.
Expand Down Expand Up @@ -49,4 +49,4 @@ To select specific columns you need a `--` after the column list specification.

- Read `letter` and `data` columns from the `multi_partitioned` dat table:

`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
6 changes: 3 additions & 3 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum};
use url::Url;

/// An example program that reads a table using multiple threads. This shows the use of the
/// scan_data and global_scan_state methods on a Scan, that can be used to partition work to either
/// scan_metadata and global_scan_state methods on a Scan, that can be used to partition work to either
/// multiple threads, or workers (in the case of a distributed engine).
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -179,7 +179,7 @@ fn try_main() -> DeltaResult<()> {
// [`delta_kernel::scan::scan_row_schema`]. Generally engines will not need to interact with
// this data directly, and can just call [`visit_scan_files`] to get pre-parsed data back from
// the kernel.
let scan_data = scan.scan_data(engine.as_ref())?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?;

// get any global state associated with this scan
let global_state = Arc::new(scan.global_scan_state());
Expand Down Expand Up @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> {
// done sending
drop(record_batch_tx);

for res in scan_data {
for res in scan_metadata {
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
22 changes: 11 additions & 11 deletions kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! typically from newest to oldest.
//!
//! Log replay is currently implemented for table scans, which filter and apply transformations
//! to produce file actions which builds the view of the table state at a specific point in time.
//! to produce file actions which builds the view of the table state at a specific point in time.
//! Future extensions will support additional log replay processors beyond the current use case.
//! (e.g. checkpointing: filter actions to include only those needed to rebuild table state)
//!
Expand Down Expand Up @@ -46,7 +46,7 @@ impl FileActionKey {
/// significantly reducing memory usage for large Delta tables with extensive history.
///
/// TODO: Modify deduplication to track only file paths instead of (path, dv_unique_id).
/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701
/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701
pub(crate) struct FileActionDeduplicator<'seen> {
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
/// far in the log for deduplication. This is a mutable reference to the set
Expand Down Expand Up @@ -208,23 +208,23 @@ impl<'seen> FileActionDeduplicator<'seen> {
/// processed by the processor, (if a filter is provided).
///
/// Implementations:
/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated
/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated
/// `Add` actions from log batches to reconstruct the view of the table at a specific point in time.
/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is
/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is
/// provided.
///
/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct
/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as
/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state.
/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct
/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as
/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state.
/// Data skipping is not applied during checkpoint processing.
///
/// The `Output` type represents the material result of log replay, and it must implement the
/// `HasSelectionVector` trait to allow filtering of irrelevant rows:
///
/// - For **scans**, the output type is `ScanData`, which contains the file actions (`Add` actions) that
/// need to be applied to build the table's view, accompanied by a **selection vector** that identifies
/// which rows should be included. A transform vector may also be included to handle schema changes,
/// such as renaming columns or modifying data types.
/// - For **scans**, the output type is `ScanMetadata`, which contains the file actions (`Add`
/// actions) that need to be applied to build the table's view, accompanied by a
/// **selection vector** that identifies which rows should be included. A transform vector may
/// also be included to handle schema changes, such as renaming columns or modifying data types.
///
/// - For **checkpoints**, the output includes the actions necessary to write to the checkpoint file (`Add`,
/// `Remove`, `Metadata`, `Protocol` actions), filtered by the **selection vector** to determine which
Expand Down
Loading
Loading