Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ fn assert_columns_match(actual: &[Arc<dyn Array>], expected: &[Arc<dyn Array>])
}
}

pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo) -> TestResult<()> {
pub async fn assert_scan_metadata(
engine: Arc<dyn Engine>,
test_case: &TestCaseInfo,
) -> TestResult<()> {
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine.as_ref(), None)?;
Expand Down
2 changes: 1 addition & 1 deletion acceptance/tests/dat_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
);

case.assert_metadata(engine.clone()).await.unwrap();
acceptance::data::assert_scan_data(engine.clone(), &case)
acceptance::data::assert_scan_metadata(engine.clone(), &case)
.await
.unwrap();
});
Expand Down
29 changes: 16 additions & 13 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,10 @@ void scan_row_callback(
context->partition_values = NULL;
}

// For each chunk of scan data (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel)
void do_visit_scan_data(
// For each chunk of scan metadata (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_metadata to avoid conflict with visit_scan_metadata exported by
// kernel)
void do_visit_scan_metadata(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec,
Expand All @@ -96,7 +97,7 @@ void do_visit_scan_data(
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
visit_scan_metadata(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down Expand Up @@ -291,26 +292,28 @@ int main(int argc, char* argv[])
#endif
};

ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_data_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator) {
print_error("Failed to construct scan data iterator.", (Error*)data_iter_res.err);
ExternResultHandleSharedScanMetadataIterator data_iter_res =
kernel_scan_metadata_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanMetadataIterator) {
print_error("Failed to construct scan metadata iterator.", (Error*)data_iter_res.err);
free_error((Error*)data_iter_res.err);
return -1;
}

SharedScanDataIterator* data_iter = data_iter_res.ok;
SharedScanMetadataIterator* data_iter = data_iter_res.ok;

print_diag("\nIterating scan data\n");
print_diag("\nIterating scan metadata\n");

// iterate scan files
for (;;) {
ExternResultbool ok_res = kernel_scan_data_next(data_iter, &context, do_visit_scan_data);
ExternResultbool ok_res =
kernel_scan_metadata_next(data_iter, &context, do_visit_scan_metadata);
if (ok_res.tag != Okbool) {
print_error("Failed to iterate scan data.", (Error*)ok_res.err);
print_error("Failed to iterate scan metadata.", (Error*)ok_res.err);
free_error((Error*)ok_res.err);
return -1;
} else if (!ok_res.ok) {
print_diag("Scan data iterator done\n");
print_diag("Scan metadata iterator done\n");
break;
}
}
Expand All @@ -323,7 +326,7 @@ int main(int argc, char* argv[])
context.arrow_context = NULL;
#endif

free_kernel_scan_data(data_iter);
free_scan_metadata(data_iter);
free_scan(scan);
free_schema(logical_schema);
free_schema(read_schema);
Expand Down
8 changes: 4 additions & 4 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,11 +685,11 @@ pub struct StringSliceIterator;

/// # Safety
///
/// The iterator must be valid (returned by [`kernel_scan_data_init`]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [`kernel_scan_metadata_init`]) and not yet freed by
/// [`free_scan_metadata`]. The visitor function pointer must be non-null.
///
/// [`kernel_scan_data_init`]: crate::scan::kernel_scan_data_init
/// [`free_kernel_scan_data`]: crate::scan::free_kernel_scan_data
/// [`kernel_scan_metadata_init`]: crate::scan::kernel_scan_metadata_init
/// [`free_scan_metadata`]: crate::scan::free_scan_metadata
#[no_mangle]
pub unsafe extern "C" fn string_slice_next(
data: Handle<StringSliceIterator>,
Expand Down
70 changes: 35 additions & 35 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::scan::{Scan, ScanMetadata};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
Expand All @@ -24,7 +24,7 @@ use crate::{
use super::handle::Handle;

// TODO: Why do we even need to expose a scan, when the only thing an engine can do with it is
// handit back to the kernel by calling `kernel_scan_data_init`? There isn't even an FFI method to
// handit back to the kernel by calling `kernel_scan_metadata_init`? There isn't even an FFI method to
// drop it!
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
pub struct SharedScan;
Expand Down Expand Up @@ -125,70 +125,70 @@ pub unsafe extern "C" fn free_global_scan_state(state: Handle<SharedGlobalScanSt
// means kernel made the decision of how to achieve thread safety. This may not be desirable if the
// engine is single-threaded, or has its own mutual exclusion mechanisms. Deadlock is even a
// conceivable risk, if this interacts poorly with engine's mutual exclusion mechanism.
pub struct KernelScanDataIterator {
pub struct KernelScanMetadataIterator {
// Mutex -> Allow the iterator to be accessed safely by multiple threads.
// Box -> Wrap its unsized content this struct is fixed-size with thin pointers.
// Item = DeltaResult<ScanData>
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanData>> + Send>>,
// Item = DeltaResult<ScanMetadata>
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>> + Send>>,

// Also keep a reference to the external engine for its error allocator. The default Parquet and
// Json handlers don't hold any reference to the tokio reactor they rely on, so the iterator
// terminates early if the last engine goes out of scope.
engine: Arc<dyn ExternEngine>,
}

#[handle_descriptor(target=KernelScanDataIterator, mutable=false, sized=true)]
pub struct SharedScanDataIterator;
#[handle_descriptor(target=KernelScanMetadataIterator, mutable=false, sized=true)]
pub struct SharedScanMetadataIterator;

impl Drop for KernelScanDataIterator {
impl Drop for KernelScanMetadataIterator {
fn drop(&mut self) {
debug!("dropping KernelScanDataIterator");
debug!("dropping KernelScanMetadataIterator");
}
}

/// Get an iterator over the data needed to perform a scan. This will return a
/// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual
/// data in the iterator.
/// [`KernelScanMetadataIterator`] which can be passed to [`kernel_scan_metadata_next`] to get the
/// actual data in the iterator.
///
/// # Safety
///
/// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedScan`]
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_init(
pub unsafe extern "C" fn kernel_scan_metadata_init(
engine: Handle<SharedExternEngine>,
scan: Handle<SharedScan>,
) -> ExternResult<Handle<SharedScanDataIterator>> {
) -> ExternResult<Handle<SharedScanMetadataIterator>> {
let engine = unsafe { engine.clone_as_arc() };
let scan = unsafe { scan.as_ref() };
kernel_scan_data_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
kernel_scan_metadata_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
}

fn kernel_scan_data_init_impl(
fn kernel_scan_metadata_init_impl(
engine: &Arc<dyn ExternEngine>,
scan: &Scan,
) -> DeltaResult<Handle<SharedScanDataIterator>> {
let scan_data = scan.scan_data(engine.engine().as_ref())?;
let data = KernelScanDataIterator {
data: Mutex::new(Box::new(scan_data)),
) -> DeltaResult<Handle<SharedScanMetadataIterator>> {
let scan_metadata = scan.scan_metadata(engine.engine().as_ref())?;
let data = KernelScanMetadataIterator {
data: Mutex::new(Box::new(scan_metadata)),
engine: engine.clone(),
};
Ok(Arc::new(data).into())
}

/// Call the provided `engine_visitor` on the next scan data item. The visitor will be provided with
/// a selection vector and engine data. It is the responsibility of the _engine_ to free these when
/// it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
/// Call the provided `engine_visitor` on the next scan metadata item. The visitor will be provided
/// with a selection vector and engine data. It is the responsibility of the _engine_ to free these
/// when it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively.
///
/// # Safety
///
/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [kernel_scan_metadata_init]) and not yet freed by
/// [`free_scan_metadata`]. The visitor function pointer must be non-null.
///
/// [`free_bool_slice`]: crate::free_bool_slice
/// [`free_engine_data`]: crate::free_engine_data
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_next(
data: Handle<SharedScanDataIterator>,
pub unsafe extern "C" fn kernel_scan_metadata_next(
data: Handle<SharedScanMetadataIterator>,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
Expand All @@ -198,11 +198,11 @@ pub unsafe extern "C" fn kernel_scan_data_next(
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
kernel_scan_data_next_impl(data, engine_context, engine_visitor)
kernel_scan_metadata_next_impl(data, engine_context, engine_visitor)
.into_extern_result(&data.engine.as_ref())
}
fn kernel_scan_data_next_impl(
data: &KernelScanDataIterator,
fn kernel_scan_metadata_next_impl(
data: &KernelScanMetadataIterator,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
engine_context: NullableCvoid,
Expand All @@ -228,11 +228,11 @@ fn kernel_scan_data_next_impl(
/// # Safety
///
/// Caller is responsible for (at most once) passing a valid pointer returned by a call to
/// [`kernel_scan_data_init`].
/// [`kernel_scan_metadata_init`].
// we should probably be consistent with drop vs. free on engine side (probably the latter is more
// intuitive to non-rust code)
#[no_mangle]
pub unsafe extern "C" fn free_kernel_scan_data(data: Handle<SharedScanDataIterator>) {
pub unsafe extern "C" fn free_scan_metadata(data: Handle<SharedScanMetadataIterator>) {
data.drop_handle();
}

Expand Down Expand Up @@ -297,14 +297,14 @@ pub unsafe extern "C" fn get_from_string_map(
.and_then(|v| allocate_fn(kernel_string_slice!(v)))
}

/// Transformation expressions that need to be applied to each row `i` in ScanData. You can use
/// Transformation expressions that need to be applied to each row `i` in ScanMetadata. You can use
/// [`get_transform_for_row`] to get the transform for a particular row. If that returns an
/// associated expression, it _must_ be applied to the data read from the file specified by the
/// row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If
/// `get_transform_for_row` returns `NULL` no expression need be applied and the data read from disk
/// is already in the correct logical state.
///
/// NB: If you are using `visit_scan_data` you don't need to worry about dealing with probing
/// NB: If you are using `visit_scan_metadata` you don't need to worry about dealing with probing
/// `CTransforms`. The callback will be invoked with the correct transform for you.
pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
Expand Down Expand Up @@ -420,13 +420,13 @@ struct ContextWrapper {
callback: CScanCallback,
}

/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan
/// Shim for ffi to call visit_scan_metadata. This will generally be called when iterating through scan
/// data which provides the data handle and selection vector as each element in the iterator.
///
/// # Safety
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
pub unsafe extern "C" fn visit_scan_metadata(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
Expand Down
8 changes: 4 additions & 4 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum Commands {
/// Show the table's schema
Schema,
/// Show the meta-data that would be used to scan the table
ScanData,
ScanMetadata,
/// Show each action from the log-segments
Actions {
/// Show the log in reverse order (default is log replay order -- newest first)
Expand Down Expand Up @@ -207,10 +207,10 @@ fn try_main() -> DeltaResult<()> {
Commands::Schema => {
println!("{:#?}", snapshot.schema());
}
Commands::ScanData => {
Commands::ScanMetadata => {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let scan_metadata = scan.scan_metadata(&engine)?;
for res in scan_metadata {
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Read Table Multi-Threaded

# About
This example shows a program that reads a table using multiple threads. This shows the use of the
`scan_data`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
`scan_metadata`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
to either multiple threads, or workers (in the case of a distributed engine).

You can run this from the same directory as this `README.md` by running `cargo run -- [args]`.
Expand Down Expand Up @@ -49,4 +49,4 @@ To select specific columns you need a `--` after the column list specification.

- Read `letter` and `data` columns from the `multi_partitioned` dat table:

`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
6 changes: 3 additions & 3 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum};
use url::Url;

/// An example program that reads a table using multiple threads. This shows the use of the
/// scan_data and global_scan_state methods on a Scan, that can be used to partition work to either
/// scan_metadata and global_scan_state methods on a Scan, that can be used to partition work to either
/// multiple threads, or workers (in the case of a distributed engine).
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -179,7 +179,7 @@ fn try_main() -> DeltaResult<()> {
// [`delta_kernel::scan::scan_row_schema`]. Generally engines will not need to interact with
// this data directly, and can just call [`visit_scan_files`] to get pre-parsed data back from
// the kernel.
let scan_data = scan.scan_data(engine.as_ref())?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?;

// get any global state associated with this scan
let global_state = Arc::new(scan.global_scan_state());
Expand Down Expand Up @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> {
// done sending
drop(record_batch_tx);

for res in scan_data {
for res in scan_metadata {
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
22 changes: 11 additions & 11 deletions kernel/src/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! typically from newest to oldest.
//!
//! Log replay is currently implemented for table scans, which filter and apply transformations
//! to produce file actions which builds the view of the table state at a specific point in time.
//! to produce file actions which builds the view of the table state at a specific point in time.
//! Future extensions will support additional log replay processors beyond the current use case.
//! (e.g. checkpointing: filter actions to include only those needed to rebuild table state)
//!
Expand Down Expand Up @@ -46,7 +46,7 @@ impl FileActionKey {
/// significantly reducing memory usage for large Delta tables with extensive history.
///
/// TODO: Modify deduplication to track only file paths instead of (path, dv_unique_id).
/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701
/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701
pub(crate) struct FileActionDeduplicator<'seen> {
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
/// far in the log for deduplication. This is a mutable reference to the set
Expand Down Expand Up @@ -208,23 +208,23 @@ impl<'seen> FileActionDeduplicator<'seen> {
/// processed by the processor, (if a filter is provided).
///
/// Implementations:
/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated
/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated
/// `Add` actions from log batches to reconstruct the view of the table at a specific point in time.
/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is
/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is
/// provided.
///
/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct
/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as
/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state.
/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct
/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as
/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state.
/// Data skipping is not applied during checkpoint processing.
///
/// The `Output` type represents the material result of log replay, and it must implement the
/// `HasSelectionVector` trait to allow filtering of irrelevant rows:
///
/// - For **scans**, the output type is `ScanData`, which contains the file actions (`Add` actions) that
/// need to be applied to build the table's view, accompanied by a **selection vector** that identifies
/// which rows should be included. A transform vector may also be included to handle schema changes,
/// such as renaming columns or modifying data types.
/// - For **scans**, the output type is `ScanMetadata`, which contains the file actions (`Add`
/// actions) that need to be applied to build the table's view, accompanied by a
/// **selection vector** that identifies which rows should be included. A transform vector may
/// also be included to handle schema changes, such as renaming columns or modifying data types.
///
/// - For **checkpoints**, the output includes the actions necessary to write to the checkpoint file (`Add`,
/// `Remove`, `Metadata`, `Protocol` actions), filtered by the **selection vector** to determine which
Expand Down
Loading
Loading