diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index b045634b5c..9685f29c37 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -109,7 +109,10 @@ fn assert_columns_match(actual: &[Arc], expected: &[Arc]) } } -pub async fn assert_scan_data(engine: Arc, test_case: &TestCaseInfo) -> TestResult<()> { +pub async fn assert_scan_metadata( + engine: Arc, + test_case: &TestCaseInfo, +) -> TestResult<()> { let table_root = test_case.table_root()?; let table = Table::new(table_root); let snapshot = table.snapshot(engine.as_ref(), None)?; diff --git a/acceptance/tests/dat_reader.rs b/acceptance/tests/dat_reader.rs index 622f038a9d..6ba0e6d350 100644 --- a/acceptance/tests/dat_reader.rs +++ b/acceptance/tests/dat_reader.rs @@ -37,7 +37,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> { ); case.assert_metadata(engine.clone()).await.unwrap(); - acceptance::data::assert_scan_data(engine.clone(), &case) + acceptance::data::assert_scan_metadata(engine.clone(), &case) .await .unwrap(); }); diff --git a/ffi/examples/read-table/read_table.c b/ffi/examples/read-table/read_table.c index d24be5a6bd..7ff14d5295 100644 --- a/ffi/examples/read-table/read_table.c +++ b/ffi/examples/read-table/read_table.c @@ -83,9 +83,10 @@ void scan_row_callback( context->partition_values = NULL; } -// For each chunk of scan data (which may contain multiple files to scan), kernel will call this -// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel) -void do_visit_scan_data( +// For each chunk of scan metadata (which may contain multiple files to scan), kernel will call this +// function (named do_visit_scan_metadata to avoid conflict with visit_scan_metadata exported by +// kernel) +void do_visit_scan_metadata( void* engine_context, ExclusiveEngineData* engine_data, KernelBoolSlice selection_vec, @@ -96,7 +97,7 @@ void do_visit_scan_data( print_selection_vector(" ", &selection_vec); // Ask kernel to iterate each individual file and call us back with extracted metadata print_diag("Asking kernel to call us back for each scan row (file to read)\n"); - visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback); + visit_scan_metadata(engine_data, selection_vec, transforms, engine_context, scan_row_callback); free_bool_slice(selection_vec); free_engine_data(engine_data); } @@ -291,26 +292,28 @@ int main(int argc, char* argv[]) #endif }; - ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_data_init(engine, scan); - if (data_iter_res.tag != OkHandleSharedScanDataIterator) { - print_error("Failed to construct scan data iterator.", (Error*)data_iter_res.err); + ExternResultHandleSharedScanMetadataIterator data_iter_res = + scan_metadata_iter_init(engine, scan); + if (data_iter_res.tag != OkHandleSharedScanMetadataIterator) { + print_error("Failed to construct scan metadata iterator.", (Error*)data_iter_res.err); free_error((Error*)data_iter_res.err); return -1; } - SharedScanDataIterator* data_iter = data_iter_res.ok; + SharedScanMetadataIterator* data_iter = data_iter_res.ok; - print_diag("\nIterating scan data\n"); + print_diag("\nIterating scan metadata\n"); // iterate scan files for (;;) { - ExternResultbool ok_res = kernel_scan_data_next(data_iter, &context, do_visit_scan_data); + ExternResultbool ok_res = + scan_metadata_next(data_iter, &context, do_visit_scan_metadata); if (ok_res.tag != Okbool) { - print_error("Failed to iterate scan data.", (Error*)ok_res.err); + print_error("Failed to iterate scan metadata.", (Error*)ok_res.err); free_error((Error*)ok_res.err); return -1; } else if (!ok_res.ok) { - print_diag("Scan data iterator done\n"); + print_diag("Scan metadata iterator done\n"); break; } } @@ -323,7 +326,7 @@ int main(int argc, char* argv[]) context.arrow_context = NULL; #endif - free_kernel_scan_data(data_iter); + free_scan_metadata_iter(data_iter); free_scan(scan); free_schema(logical_schema); free_schema(read_schema); diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index febc014c86..e245531589 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -685,11 +685,11 @@ pub struct StringSliceIterator; /// # Safety /// -/// The iterator must be valid (returned by [`kernel_scan_data_init`]) and not yet freed by -/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null. +/// The iterator must be valid (returned by [`scan_metadata_iter_init`]) and not yet freed by +/// [`free_scan_metadata_iter`]. The visitor function pointer must be non-null. /// -/// [`kernel_scan_data_init`]: crate::scan::kernel_scan_data_init -/// [`free_kernel_scan_data`]: crate::scan::free_kernel_scan_data +/// [`scan_metadata_iter_init`]: crate::scan::scan_metadata_iter_init +/// [`free_scan_metadata_iter`]: crate::scan::free_scan_metadata_iter #[no_mangle] pub unsafe extern "C" fn string_slice_next( data: Handle, diff --git a/ffi/src/scan.rs b/ffi/src/scan.rs index 0d9c6382cb..edade51ee8 100644 --- a/ffi/src/scan.rs +++ b/ffi/src/scan.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState}; -use delta_kernel::scan::{Scan, ScanData}; +use delta_kernel::scan::{Scan, ScanMetadata}; use delta_kernel::snapshot::Snapshot; use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef}; use delta_kernel_ffi_macros::handle_descriptor; @@ -24,7 +24,7 @@ use crate::{ use super::handle::Handle; // TODO: Why do we even need to expose a scan, when the only thing an engine can do with it is -// handit back to the kernel by calling `kernel_scan_data_init`? There isn't even an FFI method to +// handit back to the kernel by calling `scan_metadata_iter_init`? There isn't even an FFI method to // drop it! #[handle_descriptor(target=Scan, mutable=false, sized=true)] pub struct SharedScan; @@ -125,11 +125,11 @@ pub unsafe extern "C" fn free_global_scan_state(state: Handle Allow the iterator to be accessed safely by multiple threads. // Box -> Wrap its unsized content this struct is fixed-size with thin pointers. - // Item = DeltaResult - data: Mutex> + Send>>, + // Item = DeltaResult + data: Mutex> + Send>>, // Also keep a reference to the external engine for its error allocator. The default Parquet and // Json handlers don't hold any reference to the tokio reactor they rely on, so the iterator @@ -137,58 +137,58 @@ pub struct KernelScanDataIterator { engine: Arc, } -#[handle_descriptor(target=KernelScanDataIterator, mutable=false, sized=true)] -pub struct SharedScanDataIterator; +#[handle_descriptor(target=ScanMetadataIterator, mutable=false, sized=true)] +pub struct SharedScanMetadataIterator; -impl Drop for KernelScanDataIterator { +impl Drop for ScanMetadataIterator { fn drop(&mut self) { - debug!("dropping KernelScanDataIterator"); + debug!("dropping ScanMetadataIterator"); } } /// Get an iterator over the data needed to perform a scan. This will return a -/// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual -/// data in the iterator. +/// [`ScanMetadataIterator`] which can be passed to [`scan_metadata_next`] to get the +/// actual data in the iterator. /// /// # Safety /// /// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedScan`] #[no_mangle] -pub unsafe extern "C" fn kernel_scan_data_init( +pub unsafe extern "C" fn scan_metadata_iter_init( engine: Handle, scan: Handle, -) -> ExternResult> { +) -> ExternResult> { let engine = unsafe { engine.clone_as_arc() }; let scan = unsafe { scan.as_ref() }; - kernel_scan_data_init_impl(&engine, scan).into_extern_result(&engine.as_ref()) + scan_metadata_iter_init_impl(&engine, scan).into_extern_result(&engine.as_ref()) } -fn kernel_scan_data_init_impl( +fn scan_metadata_iter_init_impl( engine: &Arc, scan: &Scan, -) -> DeltaResult> { - let scan_data = scan.scan_data(engine.engine().as_ref())?; - let data = KernelScanDataIterator { - data: Mutex::new(Box::new(scan_data)), +) -> DeltaResult> { + let scan_metadata = scan.scan_metadata(engine.engine().as_ref())?; + let data = ScanMetadataIterator { + data: Mutex::new(Box::new(scan_metadata)), engine: engine.clone(), }; Ok(Arc::new(data).into()) } -/// Call the provided `engine_visitor` on the next scan data item. The visitor will be provided with -/// a selection vector and engine data. It is the responsibility of the _engine_ to free these when -/// it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively. +/// Call the provided `engine_visitor` on the next scan metadata item. The visitor will be provided +/// with a selection vector and engine data. It is the responsibility of the _engine_ to free these +/// when it is finished by calling [`free_bool_slice`] and [`free_engine_data`] respectively. /// /// # Safety /// -/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by -/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null. +/// The iterator must be valid (returned by [scan_metadata_iter_init]) and not yet freed by +/// [`free_scan_metadata_iter`]. The visitor function pointer must be non-null. /// /// [`free_bool_slice`]: crate::free_bool_slice /// [`free_engine_data`]: crate::free_engine_data #[no_mangle] -pub unsafe extern "C" fn kernel_scan_data_next( - data: Handle, +pub unsafe extern "C" fn scan_metadata_next( + data: Handle, engine_context: NullableCvoid, engine_visitor: extern "C" fn( engine_context: NullableCvoid, @@ -198,11 +198,11 @@ pub unsafe extern "C" fn kernel_scan_data_next( ), ) -> ExternResult { let data = unsafe { data.as_ref() }; - kernel_scan_data_next_impl(data, engine_context, engine_visitor) + scan_metadata_next_impl(data, engine_context, engine_visitor) .into_extern_result(&data.engine.as_ref()) } -fn kernel_scan_data_next_impl( - data: &KernelScanDataIterator, +fn scan_metadata_next_impl( + data: &ScanMetadataIterator, engine_context: NullableCvoid, engine_visitor: extern "C" fn( engine_context: NullableCvoid, @@ -228,11 +228,11 @@ fn kernel_scan_data_next_impl( /// # Safety /// /// Caller is responsible for (at most once) passing a valid pointer returned by a call to -/// [`kernel_scan_data_init`]. +/// [`scan_metadata_iter_init`]. // we should probably be consistent with drop vs. free on engine side (probably the latter is more // intuitive to non-rust code) #[no_mangle] -pub unsafe extern "C" fn free_kernel_scan_data(data: Handle) { +pub unsafe extern "C" fn free_scan_metadata_iter(data: Handle) { data.drop_handle(); } @@ -297,14 +297,14 @@ pub unsafe extern "C" fn get_from_string_map( .and_then(|v| allocate_fn(kernel_string_slice!(v))) } -/// Transformation expressions that need to be applied to each row `i` in ScanData. You can use +/// Transformation expressions that need to be applied to each row `i` in ScanMetadata. You can use /// [`get_transform_for_row`] to get the transform for a particular row. If that returns an /// associated expression, it _must_ be applied to the data read from the file specified by the /// row. The resultant schema for this expression is guaranteed to be `Scan.schema()`. If /// `get_transform_for_row` returns `NULL` no expression need be applied and the data read from disk /// is already in the correct logical state. /// -/// NB: If you are using `visit_scan_data` you don't need to worry about dealing with probing +/// NB: If you are using `visit_scan_metadata` you don't need to worry about dealing with probing /// `CTransforms`. The callback will be invoked with the correct transform for you. pub struct CTransforms { transforms: Vec>, @@ -420,13 +420,13 @@ struct ContextWrapper { callback: CScanCallback, } -/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan +/// Shim for ffi to call visit_scan_metadata. This will generally be called when iterating through scan /// data which provides the data handle and selection vector as each element in the iterator. /// /// # Safety /// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector. #[no_mangle] -pub unsafe extern "C" fn visit_scan_data( +pub unsafe extern "C" fn visit_scan_metadata( data: Handle, selection_vec: KernelBoolSlice, transforms: &CTransforms, diff --git a/kernel/examples/inspect-table/src/main.rs b/kernel/examples/inspect-table/src/main.rs index 6cde89d55e..ae8213c4b6 100644 --- a/kernel/examples/inspect-table/src/main.rs +++ b/kernel/examples/inspect-table/src/main.rs @@ -41,7 +41,7 @@ enum Commands { /// Show the table's schema Schema, /// Show the meta-data that would be used to scan the table - ScanData, + ScanMetadata, /// Show each action from the log-segments Actions { /// Show the log in reverse order (default is log replay order -- newest first) @@ -207,10 +207,10 @@ fn try_main() -> DeltaResult<()> { Commands::Schema => { println!("{:#?}", snapshot.schema()); } - Commands::ScanData => { + Commands::ScanMetadata => { let scan = ScanBuilder::new(snapshot).build()?; - let scan_data = scan.scan_data(&engine)?; - for res in scan_data { + let scan_metadata = scan.scan_metadata(&engine)?; + for res in scan_metadata { let (data, vector, transforms) = res?; delta_kernel::scan::state::visit_scan_files( data.as_ref(), diff --git a/kernel/examples/read-table-multi-threaded/README.md b/kernel/examples/read-table-multi-threaded/README.md index 5c4cdebfb8..8cb45ecdb4 100644 --- a/kernel/examples/read-table-multi-threaded/README.md +++ b/kernel/examples/read-table-multi-threaded/README.md @@ -3,7 +3,7 @@ Read Table Multi-Threaded # About This example shows a program that reads a table using multiple threads. This shows the use of the -`scan_data`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work +`scan_metadata`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work to either multiple threads, or workers (in the case of a distributed engine). You can run this from the same directory as this `README.md` by running `cargo run -- [args]`. @@ -49,4 +49,4 @@ To select specific columns you need a `--` after the column list specification. - Read `letter` and `data` columns from the `multi_partitioned` dat table: -`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/` +`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/` \ No newline at end of file diff --git a/kernel/examples/read-table-multi-threaded/src/main.rs b/kernel/examples/read-table-multi-threaded/src/main.rs index e3aca631ab..5ac3a50a6a 100644 --- a/kernel/examples/read-table-multi-threaded/src/main.rs +++ b/kernel/examples/read-table-multi-threaded/src/main.rs @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum}; use url::Url; /// An example program that reads a table using multiple threads. This shows the use of the -/// scan_data and global_scan_state methods on a Scan, that can be used to partition work to either +/// scan_metadata and global_scan_state methods on a Scan, that can be used to partition work to either /// multiple threads, or workers (in the case of a distributed engine). #[derive(Parser)] #[command(author, version, about, long_about = None)] @@ -179,7 +179,7 @@ fn try_main() -> DeltaResult<()> { // [`delta_kernel::scan::scan_row_schema`]. Generally engines will not need to interact with // this data directly, and can just call [`visit_scan_files`] to get pre-parsed data back from // the kernel. - let scan_data = scan.scan_data(engine.as_ref())?; + let scan_metadata = scan.scan_metadata(engine.as_ref())?; // get any global state associated with this scan let global_state = Arc::new(scan.global_scan_state()); @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> { // done sending drop(record_batch_tx); - for res in scan_data { + for res in scan_metadata { let (data, vector, transforms) = res?; scan_file_tx = delta_kernel::scan::state::visit_scan_files( data.as_ref(), diff --git a/kernel/src/log_replay.rs b/kernel/src/log_replay.rs index abb451c01c..c9a58492f9 100644 --- a/kernel/src/log_replay.rs +++ b/kernel/src/log_replay.rs @@ -6,7 +6,7 @@ //! typically from newest to oldest. //! //! Log replay is currently implemented for table scans, which filter and apply transformations -//! to produce file actions which builds the view of the table state at a specific point in time. +//! to produce file actions which builds the view of the table state at a specific point in time. //! Future extensions will support additional log replay processors beyond the current use case. //! (e.g. checkpointing: filter actions to include only those needed to rebuild table state) //! @@ -46,7 +46,7 @@ impl FileActionKey { /// significantly reducing memory usage for large Delta tables with extensive history. /// /// TODO: Modify deduplication to track only file paths instead of (path, dv_unique_id). -/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701 +/// More info here: https://github.com/delta-io/delta-kernel-rs/issues/701 pub(crate) struct FileActionDeduplicator<'seen> { /// A set of (data file path, dv_unique_id) pairs that have been seen thus /// far in the log for deduplication. This is a mutable reference to the set @@ -208,23 +208,23 @@ impl<'seen> FileActionDeduplicator<'seen> { /// processed by the processor, (if a filter is provided). /// /// Implementations: -/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated +/// - `ScanLogReplayProcessor`: Used for table scans, this processor filters and selects deduplicated /// `Add` actions from log batches to reconstruct the view of the table at a specific point in time. -/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is +/// Note that scans do not expose `Remove` actions. Data skipping may be applied when a predicate is /// provided. /// -/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct -/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as -/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state. +/// - `CheckpointLogReplayProcessor` (WIP): Will be responsible for processing log batches to construct +/// V1 spec checkpoint files. Unlike scans, checkpoint processing includes additional actions, such as +/// `Remove`, `Metadata`, and `Protocol`, required to fully reconstruct table state. /// Data skipping is not applied during checkpoint processing. /// /// The `Output` type represents the material result of log replay, and it must implement the /// `HasSelectionVector` trait to allow filtering of irrelevant rows: /// -/// - For **scans**, the output type is `ScanData`, which contains the file actions (`Add` actions) that -/// need to be applied to build the table's view, accompanied by a **selection vector** that identifies -/// which rows should be included. A transform vector may also be included to handle schema changes, -/// such as renaming columns or modifying data types. +/// - For **scans**, the output type is `ScanMetadata`, which contains the file actions (`Add` +/// actions) that need to be applied to build the table's view, accompanied by a +/// **selection vector** that identifies which rows should be included. A transform vector may +/// also be included to handle schema changes, such as renaming columns or modifying data types. /// /// - For **checkpoints**, the output includes the actions necessary to write to the checkpoint file (`Add`, /// `Remove`, `Metadata`, `Protocol` actions), filtered by the **selection vector** to determine which diff --git a/kernel/src/scan/log_replay.rs b/kernel/src/scan/log_replay.rs index f5d60a0978..9b45fc3c6b 100644 --- a/kernel/src/scan/log_replay.rs +++ b/kernel/src/scan/log_replay.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, LazyLock}; use itertools::Itertools; use super::data_skipping::DataSkippingFilter; -use super::{ScanData, Transform}; +use super::{ScanMetadata, Transform}; use crate::actions::get_log_add_schema; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef}; @@ -28,15 +28,16 @@ use crate::{DeltaResult, Engine, EngineData, Error, ExpressionEvaluator}; /// - Action Deduplication: Leverages the [`FileActionDeduplicator`] to ensure that for each unique file /// (identified by its path and deletion vector unique ID), only the latest valid Add action is processed. /// - Transformation: Applies a built-in transformation (`add_transform`) to convert selected Add actions -/// into [`ScanData`], the intermediate format passed to the engine. +/// into [`ScanMetadata`], the intermediate format passed to the engine. /// - Row Transform Passthrough: Any user-provided row-level transformation expressions (e.g. those derived /// from projection or filters) are preserved and passed through to the engine, which applies them as part /// of its scan execution logic. /// -/// As an implementation of [`LogReplayProcessor`], [`ScanLogReplayProcessor`] provides the `process_actions_batch` -/// method, which applies these steps to each batch of log actions and produces a [`ScanData`] result. This result -/// includes the transformed batch, a selection vector indicating which rows are valid, and any -/// row-level transformation expressions that need to be applied to the selected rows. +/// As an implementation of [`LogReplayProcessor`], [`ScanLogReplayProcessor`] provides the +/// `process_actions_batch` method, which applies these steps to each batch of log actions and +/// produces a [`ScanMetadata`] result. This result includes the transformed batch, a selection +/// vector indicating which rows are valid, and any row-level transformation expressions that need +/// to be applied to the selected rows. struct ScanLogReplayProcessor { partition_filter: Option, data_skipping_filter: Option, @@ -338,7 +339,7 @@ fn get_add_transform_expr() -> Expression { } impl LogReplayProcessor for ScanLogReplayProcessor { - type Output = ScanData; + type Output = ScanMetadata; fn process_actions_batch( &mut self, @@ -385,7 +386,7 @@ pub(crate) fn scan_action_iter( logical_schema: SchemaRef, transform: Option>, physical_predicate: Option<(ExpressionRef, SchemaRef)>, -) -> impl Iterator> { +) -> impl Iterator> { ScanLogReplayProcessor::new(engine, physical_predicate, logical_schema, transform) .process_actions_iter(action_iter) } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index d87cdc0564..2f776b21f5 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -323,9 +323,9 @@ pub(crate) enum TransformExpr { // TODO(nick): Make this a struct in a follow-on PR // (data, deletion_vec, transforms) -pub type ScanData = (Box, Vec, Vec>); +pub type ScanMetadata = (Box, Vec, Vec>); -impl HasSelectionVector for ScanData { +impl HasSelectionVector for ScanMetadata { fn has_selected_rows(&self) -> bool { self.1.contains(&true) } @@ -402,10 +402,10 @@ impl Scan { /// the item at index `i` in this `Vec` is `None`, or if the `Vec` contains fewer than `i` /// elements, no expression need be applied and the data read from disk is already in the /// correct logical state. - pub fn scan_data( + pub fn scan_metadata( &self, engine: &dyn Engine, - ) -> DeltaResult>> { + ) -> DeltaResult>> { // Compute the static part of the transformation. This is `None` if no transformation is // needed (currently just means no partition cols AND no column mapping but will be extended // for other transforms as we support them) @@ -419,7 +419,7 @@ impl Scan { }; let it = scan_action_iter( engine, - self.replay_for_scan_data(engine)?, + self.replay_for_scan_metadata(engine)?, self.logical_schema.clone(), static_transform, physical_predicate, @@ -428,7 +428,7 @@ impl Scan { } // Factored out to facilitate testing - fn replay_for_scan_data( + fn replay_for_scan_metadata( &self, engine: &dyn Engine, ) -> DeltaResult, bool)>> + Send> { @@ -456,14 +456,14 @@ impl Scan { } } - /// Perform an "all in one" scan. This will use the provided `engine` to read and - /// process all the data for the query. Each [`ScanResult`] in the resultant iterator encapsulates - /// the raw data and an optional boolean vector built from the deletion vector if it was - /// present. See the documentation for [`ScanResult`] for more details. Generally - /// connectors/engines will want to use [`Scan::scan_data`] so they can have more control over - /// the execution of the scan. - // This calls [`Scan::scan_data`] to get an iterator of `ScanData` actions for the scan, and then uses the - // `engine`'s [`crate::ParquetHandler`] to read the actual table data. + /// Perform an "all in one" scan. This will use the provided `engine` to read and process all + /// the data for the query. Each [`ScanResult`] in the resultant iterator encapsulates the raw + /// data and an optional boolean vector built from the deletion vector if it was present. See + /// the documentation for [`ScanResult`] for more details. Generally connectors/engines will + /// want to use [`Scan::scan_metadata`] so they can have more control over the execution of the + /// scan. + // This calls [`Scan::scan_metadata`] to get an iterator of `ScanMetadata` actions for the scan, + // and then uses the `engine`'s [`crate::ParquetHandler`] to read the actual table data. pub fn execute( &self, engine: Arc, @@ -474,7 +474,7 @@ impl Scan { dv_info: DvInfo, transform: Option, } - fn scan_data_callback( + fn scan_metadata_callback( batches: &mut Vec, path: &str, size: i64, @@ -500,8 +500,8 @@ impl Scan { let table_root = self.snapshot.table_root().clone(); let physical_predicate = self.physical_predicate(); - let scan_data = self.scan_data(engine.as_ref())?; - let scan_files_iter = scan_data + let scan_metadata = self.scan_metadata(engine.as_ref())?; + let scan_files_iter = scan_metadata .map(|res| { let (data, vec, transforms) = res?; let scan_files = vec![]; @@ -510,7 +510,7 @@ impl Scan { &vec, &transforms, scan_files, - scan_data_callback, + scan_metadata_callback, ) }) // Iterator>> to Iterator> @@ -575,7 +575,7 @@ impl Scan { } } -/// Get the schema that scan rows (from [`Scan::scan_data`]) will be returned with. +/// Get the schema that scan rows (from [`Scan::scan_metadata`]) will be returned with. /// /// It is: /// ```ignored @@ -988,8 +988,8 @@ mod tests { } fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult> { - let scan_data = scan.scan_data(engine)?; - fn scan_data_callback( + let scan_metadata = scan.scan_metadata(engine)?; + fn scan_metadata_callback( paths: &mut Vec, path: &str, _size: i64, @@ -1002,21 +1002,21 @@ mod tests { assert!(dv_info.deletion_vector.is_none()); } let mut files = vec![]; - for data in scan_data { + for data in scan_metadata { let (data, vec, transforms) = data?; files = state::visit_scan_files( data.as_ref(), &vec, &transforms, files, - scan_data_callback, + scan_metadata_callback, )?; } Ok(files) } #[test] - fn test_scan_data_paths() { + fn test_scan_metadata_paths() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -1034,7 +1034,7 @@ mod tests { } #[test_log::test] - fn test_scan_data() { + fn test_scan_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); @@ -1095,7 +1095,7 @@ mod tests { } #[test] - fn test_replay_for_scan_data() { + fn test_replay_for_scan_metadata() { let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); let url = url::Url::from_directory_path(path.unwrap()).unwrap(); let engine = SyncEngine::new(); @@ -1104,7 +1104,7 @@ mod tests { let snapshot = table.snapshot(&engine, None).unwrap(); let scan = snapshot.into_scan_builder().build().unwrap(); let data: Vec<_> = scan - .replay_for_scan_data(&engine) + .replay_for_scan_metadata(&engine) .unwrap() .try_collect() .unwrap(); diff --git a/kernel/src/scan/state.rs b/kernel/src/scan/state.rs index 4d77d4b772..8489ed759d 100644 --- a/kernel/src/scan/state.rs +++ b/kernel/src/scan/state.rs @@ -152,7 +152,7 @@ pub type ScanCallback = fn( /// ## Example /// ```ignore /// let mut context = [my context]; -/// for res in scan_data { // scan data from scan.scan_data() +/// for res in scan_metadata { // scan metadata from scan.scan_metadata() /// let (data, vector) = res?; /// context = delta_kernel::scan::state::visit_scan_files( /// data.as_ref(), @@ -281,7 +281,7 @@ mod tests { } #[test] - fn test_simple_visit_scan_data() { + fn test_simple_visit_scan_metadata() { let context = TestContext { id: 2 }; run_with_validate_callback( vec![add_batch_simple(get_log_schema().clone())], diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index d241e66d58..20fc11c6e1 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -27,21 +27,21 @@ use itertools::Itertools; #[cfg(test)] mod tests; -/// Scan data for a Change Data Feed query. This holds metadata that is needed to read data rows. -pub(crate) struct TableChangesScanData { +/// Scan metadata for a Change Data Feed query. This holds metadata that's needed to read data rows. +pub(crate) struct TableChangesScanMetadata { /// Engine data with the schema defined in [`scan_row_schema`] /// /// Note: The schema of the engine data will be updated in the future to include columns /// used by Change Data Feed. - pub(crate) scan_data: Box, - /// The selection vector used to filter the `scan_data`. + pub(crate) scan_metadata: Box, + /// The selection vector used to filter the `scan_metadata`. pub(crate) selection_vector: Vec, /// A map from a remove action's path to its deletion vector pub(crate) remove_dvs: Arc>, } -/// Given an iterator of [`ParsedLogPath`] returns an iterator of [`TableChangesScanData`]. -/// Each row that is selected in the returned `TableChangesScanData.scan_data` (according +/// Given an iterator of [`ParsedLogPath`] returns an iterator of [`TableChangesScanMetadata`]. +/// Each row that is selected in the returned `TableChangesScanMetadata.scan_metadata` (according /// to the `selection_vector` field) _must_ be processed to complete the scan. Non-selected /// rows _must_ be ignored. /// @@ -52,7 +52,7 @@ pub(crate) fn table_changes_action_iter( commit_files: impl IntoIterator, table_schema: SchemaRef, physical_predicate: Option<(ExpressionRef, SchemaRef)>, -) -> DeltaResult>> { +) -> DeltaResult>> { let filter = DataSkippingFilter::new(engine.as_ref(), physical_predicate).map(Arc::new); let result = commit_files .into_iter() @@ -65,8 +65,9 @@ pub(crate) fn table_changes_action_iter( Ok(result) } -/// Processes a single commit file from the log to generate an iterator of [`TableChangesScanData`]. -/// The scanner operates in two phases that _must_ be performed in the following order: +/// Processes a single commit file from the log to generate an iterator of +/// [`TableChangesScanMetadata`]. The scanner operates in two phases that _must_ be performed in the +/// following order: /// 1. Prepare phase [`LogReplayScanner::try_new`]: This iterates over every action in the commit. /// In this phase, we do the following: /// - Determine if there exist any `cdc` actions. We determine this in the first phase because @@ -100,7 +101,7 @@ pub(crate) fn table_changes_action_iter( /// See https://github.com/delta-io/delta-kernel-rs/issues/559 /// /// 2. Scan file generation phase [`LogReplayScanner::into_scan_batches`]: This iterates over every -/// action in the commit, and generates [`TableChangesScanData`]. It does so by transforming the +/// action in the commit, and generates [`TableChangesScanMetadata`]. It does so by transforming the /// actions using [`add_transform_expr`], and generating selection vectors with the following rules: /// - If a `cdc` action was found in the prepare phase, only `cdc` actions are selected /// - Otherwise, select `add` and `remove` actions. Note that only `remove` actions that do not @@ -125,7 +126,7 @@ struct LogReplayScanner { // generated by in-commit timestamps, that timestamp will be used instead. // // Note: This will be used once an expression is introduced to transform the engine data in - // [`TableChangesScanData`] + // [`TableChangesScanMetadata`] timestamp: i64, } @@ -208,14 +209,14 @@ impl LogReplayScanner { remove_dvs, }) } - /// Generates an iterator of [`TableChangesScanData`] by iterating over each action of the + /// Generates an iterator of [`TableChangesScanMetadata`] by iterating over each action of the /// commit, generating a selection vector, and transforming the engine data. This performs /// phase 2 of [`LogReplayScanner`]. fn into_scan_batches( self, engine: Arc, filter: Option>, - ) -> DeltaResult>> { + ) -> DeltaResult>> { let Self { has_cdc_action, remove_dvs, @@ -254,9 +255,9 @@ impl LogReplayScanner { let mut visitor = FileActionSelectionVisitor::new(&remove_dvs, selection_vector, has_cdc_action); visitor.visit_rows_of(actions.as_ref())?; - let scan_data = evaluator.evaluate(actions.as_ref())?; - Ok(TableChangesScanData { - scan_data, + let scan_metadata = evaluator.evaluate(actions.as_ref())?; + Ok(TableChangesScanMetadata { + scan_metadata, selection_vector: visitor.selection_vector, remove_dvs: remove_dvs.clone(), }) diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index bbb7c1e0b8..babdde5166 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -1,5 +1,5 @@ use super::table_changes_action_iter; -use super::TableChangesScanData; +use super::TableChangesScanMetadata; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::{Add, Cdc, Metadata, Protocol, Remove}; use crate::engine::sync::SyncEngine; @@ -45,8 +45,8 @@ fn get_segment( Ok(log_segment.ascending_commit_files) } -fn result_to_sv(iter: impl Iterator>) -> Vec { - iter.map_ok(|scan_data| scan_data.selection_vector.into_iter()) +fn result_to_sv(iter: impl Iterator>) -> Vec { + iter.map_ok(|scan_metadata| scan_metadata.selection_vector.into_iter()) .flatten_ok() .try_collect() .unwrap() @@ -294,10 +294,10 @@ async fn add_remove() { let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - assert_eq!(scan_data.remove_dvs, HashMap::new().into()); - scan_data.selection_vector + .flat_map(|scan_metadata| { + let scan_metadata = scan_metadata.unwrap(); + assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); + scan_metadata.selection_vector }) .collect_vec(); @@ -344,10 +344,10 @@ async fn filter_data_change() { let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - assert_eq!(scan_data.remove_dvs, HashMap::new().into()); - scan_data.selection_vector + .flat_map(|scan_metadata| { + let scan_metadata = scan_metadata.unwrap(); + assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); + scan_metadata.selection_vector }) .collect_vec(); @@ -390,10 +390,10 @@ async fn cdc_selection() { let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - assert_eq!(scan_data.remove_dvs, HashMap::new().into()); - scan_data.selection_vector + .flat_map(|scan_metadata| { + let scan_metadata = scan_metadata.unwrap(); + assert_eq!(scan_metadata.remove_dvs, HashMap::new().into()); + scan_metadata.selection_vector }) .collect_vec(); @@ -456,10 +456,10 @@ async fn dv() { .into(); let sv = table_changes_action_iter(engine, commits, get_schema().into(), None) .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - assert_eq!(scan_data.remove_dvs, expected_remove_dvs); - scan_data.selection_vector + .flat_map(|scan_metadata| { + let scan_metadata = scan_metadata.unwrap(); + assert_eq!(scan_metadata.remove_dvs, expected_remove_dvs); + scan_metadata.selection_vector }) .collect_vec(); @@ -533,9 +533,9 @@ async fn data_skipping_filter() { let sv = table_changes_action_iter(engine, commits, logical_schema.into(), predicate) .unwrap() - .flat_map(|scan_data| { - let scan_data = scan_data.unwrap(); - scan_data.selection_vector + .flat_map(|scan_metadata| { + let scan_metadata = scan_metadata.unwrap(); + scan_metadata.selection_vector }) .collect_vec(); diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs index 971f7e058d..b9bed794d5 100644 --- a/kernel/src/table_changes/scan.rs +++ b/kernel/src/table_changes/scan.rs @@ -12,10 +12,10 @@ use crate::scan::{ColumnType, PhysicalPredicate, ScanResult}; use crate::schema::{SchemaRef, StructType}; use crate::{DeltaResult, Engine, ExpressionRef, FileMeta}; -use super::log_replay::{table_changes_action_iter, TableChangesScanData}; +use super::log_replay::{table_changes_action_iter, TableChangesScanMetadata}; use super::physical_to_logical::{physical_to_logical_expr, scan_file_physical_schema}; use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile}; -use super::scan_file::scan_data_to_scan_file; +use super::scan_file::scan_metadata_to_scan_file; use super::{TableChanges, CDF_FIELDS}; /// The result of building a [`TableChanges`] scan over a table. This can be used to get the change @@ -177,15 +177,16 @@ impl TableChangesScanBuilder { } impl TableChangesScan { - /// Returns an iterator of [`TableChangesScanData`] necessary to read CDF. Each row + /// Returns an iterator of [`TableChangesScanMetadata`] necessary to read CDF. Each row /// represents an action in the delta log. These rows are filtered to yield only the actions - /// necessary to read CDF. Additionally, [`TableChangesScanData`] holds metadata on the - /// deletion vectors present in the commit. The engine data in each scan data is guaranteed - /// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit. - fn scan_data( + /// necessary to read CDF. Additionally, [`TableChangesScanMetadata`] holds metadata on the + /// deletion vectors present in the commit. The engine data in each scan metadata is guaranteed + /// to belong to the same commit. Several [`TableChangesScanMetadata`] may belong to the same + /// commit. + fn scan_metadata( &self, engine: Arc, - ) -> DeltaResult>> { + ) -> DeltaResult>> { let commits = self .table_changes .log_segment @@ -238,8 +239,8 @@ impl TableChangesScan { &self, engine: Arc, ) -> DeltaResult>> { - let scan_data = self.scan_data(engine.clone())?; - let scan_files = scan_data_to_scan_file(scan_data); + let scan_metadata = self.scan_metadata(engine.clone())?; + let scan_files = scan_metadata_to_scan_file(scan_metadata); let global_scan_state = self.global_scan_state(); let table_root = self.table_changes.table_root().clone(); diff --git a/kernel/src/table_changes/scan_file.rs b/kernel/src/table_changes/scan_file.rs index 9556041cb3..0b74068567 100644 --- a/kernel/src/table_changes/scan_file.rs +++ b/kernel/src/table_changes/scan_file.rs @@ -6,7 +6,7 @@ use itertools::Itertools; use std::collections::HashMap; use std::sync::{Arc, LazyLock}; -use super::log_replay::TableChangesScanData; +use super::log_replay::TableChangesScanMetadata; use crate::actions::visitors::visit_deletion_vector_at; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_expr, Expression}; @@ -47,17 +47,17 @@ pub(crate) struct CdfScanFile { pub(crate) type CdfScanCallback = fn(context: &mut T, scan_file: CdfScanFile); -/// Transforms an iterator of [`TableChangesScanData`] into an iterator of +/// Transforms an iterator of [`TableChangesScanMetadata`] into an iterator of /// [`CdfScanFile`] by visiting the engine data. -pub(crate) fn scan_data_to_scan_file( - scan_data: impl Iterator>, +pub(crate) fn scan_metadata_to_scan_file( + scan_metadata: impl Iterator>, ) -> impl Iterator> { - scan_data - .map(|scan_data| -> DeltaResult<_> { - let scan_data = scan_data?; + scan_metadata + .map(|scan_metadata| -> DeltaResult<_> { + let scan_metadata = scan_metadata?; let callback: CdfScanCallback> = |context, scan_file| context.push(scan_file); - Ok(visit_cdf_scan_files(&scan_data, vec![], callback)?.into_iter()) + Ok(visit_cdf_scan_files(&scan_metadata, vec![], callback)?.into_iter()) }) // Iterator-Result-Iterator .flatten_ok() // Iterator-Result } @@ -78,7 +78,7 @@ pub(crate) fn scan_data_to_scan_file( /// ## Example /// ```ignore /// let mut context = [my context]; -/// for res in scan_data { // scan data table_changes_scan.scan_data() +/// for res in scan_metadata { // scan metadata table_changes_scan.scan_metadata() /// let (data, vector, remove_dv) = res?; /// context = delta_kernel::table_changes::scan_file::visit_cdf_scan_files( /// data.as_ref(), @@ -89,18 +89,18 @@ pub(crate) fn scan_data_to_scan_file( /// } /// ``` pub(crate) fn visit_cdf_scan_files( - scan_data: &TableChangesScanData, + scan_metadata: &TableChangesScanMetadata, context: T, callback: CdfScanCallback, ) -> DeltaResult { let mut visitor = CdfScanFileVisitor { callback, context, - selection_vector: &scan_data.selection_vector, - remove_dvs: scan_data.remove_dvs.as_ref(), + selection_vector: &scan_metadata.selection_vector, + remove_dvs: scan_metadata.remove_dvs.as_ref(), }; - visitor.visit_rows_of(scan_data.scan_data.as_ref())?; + visitor.visit_rows_of(scan_metadata.scan_metadata.as_ref())?; Ok(visitor.context) } @@ -172,7 +172,7 @@ impl RowVisitor for CdfScanFileVisitor<'_, T> { } } -/// Get the schema that scan rows (from [`TableChanges::scan_data`]) will be returned with. +/// Get the schema that scan rows (from [`TableChanges::scan_metadata`]) will be returned with. pub(crate) fn cdf_scan_row_schema() -> SchemaRef { static CDF_SCAN_ROW_SCHEMA: LazyLock> = LazyLock::new(|| { let deletion_vector = StructType::new([ @@ -213,7 +213,7 @@ pub(crate) fn cdf_scan_row_schema() -> SchemaRef { } /// Expression to convert an action with `log_schema` into one with -/// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanData`]. +/// [`cdf_scan_row_schema`]. This is the expression used to create [`TableChangesScanMetadata`]. pub(crate) fn cdf_scan_row_expression(commit_timestamp: i64, commit_number: i64) -> Expression { Expression::struct_from([ Expression::struct_from([ @@ -242,7 +242,7 @@ mod tests { use itertools::Itertools; - use super::{scan_data_to_scan_file, CdfScanFile, CdfScanFileType}; + use super::{scan_metadata_to_scan_file, CdfScanFile, CdfScanFileType}; use crate::actions::deletion_vector::DeletionVectorDescriptor; use crate::actions::{Add, Cdc, Remove}; use crate::engine::sync::SyncEngine; @@ -333,14 +333,16 @@ mod tests { StructField::nullable("id", DataType::INTEGER), StructField::nullable("value", DataType::STRING), ]); - let scan_data = table_changes_action_iter( + let scan_metadata = table_changes_action_iter( Arc::new(engine), log_segment.ascending_commit_files.clone(), table_schema.into(), None, ) .unwrap(); - let scan_files: Vec<_> = scan_data_to_scan_file(scan_data).try_collect().unwrap(); + let scan_files: Vec<_> = scan_metadata_to_scan_file(scan_metadata) + .try_collect() + .unwrap(); // Generate the expected [`CdfScanFile`] let timestamps = log_segment diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 0210b4467c..241279906f 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -273,8 +273,8 @@ async fn canonicalized_paths_test( .into_scan_builder() .build() .expect("build the scan"); - let mut scan_data = scan.scan_data(&engine).expect("scan data"); - assert!(scan_data.next().is_none()); + let mut scan_metadata = scan.scan_metadata(&engine).expect("scan metadata"); + assert!(scan_metadata.next().is_none()); Ok(()) } @@ -289,9 +289,12 @@ async fn checkpoint_test( .into_scan_builder() .build() .expect("build the scan"); - let scan_data: Vec<_> = scan.scan_data(&engine).expect("scan data").collect(); + let scan_metadata: Vec<_> = scan + .scan_metadata(&engine) + .expect("scan metadata") + .collect(); assert_eq!(version, 14); - assert!(scan_data.len() == 1); + assert!(scan_metadata.len() == 1); Ok(()) } diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 4084d6ced4..522ee0fe40 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -333,7 +333,7 @@ struct ScanFile { transform: Option, } -fn scan_data_callback( +fn scan_metadata_callback( batches: &mut Vec, path: &str, size: i64, @@ -350,7 +350,7 @@ fn scan_data_callback( }); } -fn read_with_scan_data( +fn read_with_scan_metadata( location: &Url, engine: &dyn Engine, scan: &Scan, @@ -358,16 +358,16 @@ fn read_with_scan_data( ) -> Result<(), Box> { let global_state = scan.global_scan_state(); let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); - let scan_data = scan.scan_data(engine)?; + let scan_metadata = scan.scan_metadata(engine)?; let mut scan_files = vec![]; - for data in scan_data { + for data in scan_metadata { let (data, vec, transforms) = data?; scan_files = visit_scan_files( data.as_ref(), &vec, &transforms, scan_files, - scan_data_callback, + scan_metadata_callback, )?; } @@ -462,7 +462,7 @@ fn read_table_data( .build()?; sort_lines!(expected); - read_with_scan_data(table.location(), engine.as_ref(), &scan, &expected)?; + read_with_scan_metadata(table.location(), engine.as_ref(), &scan, &expected)?; read_with_execute(engine, &scan, &expected)?; } Ok(())