Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ fn assert_columns_match(actual: &[Arc<dyn Array>], expected: &[Arc<dyn Array>])
}
}

pub async fn assert_scan_data(engine: Arc<dyn Engine>, test_case: &TestCaseInfo) -> TestResult<()> {
pub async fn assert_scan_metadata(
engine: Arc<dyn Engine>,
test_case: &TestCaseInfo,
) -> TestResult<()> {
let table_root = test_case.table_root()?;
let table = Table::new(table_root);
let snapshot = table.snapshot(engine.as_ref(), None)?;
Expand Down
2 changes: 1 addition & 1 deletion acceptance/tests/dat_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
);

case.assert_metadata(engine.clone()).await.unwrap();
acceptance::data::assert_scan_data(engine.clone(), &case)
acceptance::data::assert_scan_metadata(engine.clone(), &case)
.await
.unwrap();
});
Expand Down
14 changes: 8 additions & 6 deletions ffi/examples/read-table/read_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ void scan_row_callback(
}

// For each chunk of scan data (which may contain multiple files to scan), kernel will call this
// function (named do_visit_scan_data to avoid conflict with visit_scan_data exported by kernel)
void do_visit_scan_data(
// function (named do_visit_scan_metadata to avoid conflict with visit_scan_metadata exported by
// kernel)
void do_visit_scan_metadata(
void* engine_context,
ExclusiveEngineData* engine_data,
KernelBoolSlice selection_vec,
Expand All @@ -96,7 +97,7 @@ void do_visit_scan_data(
print_selection_vector(" ", &selection_vec);
// Ask kernel to iterate each individual file and call us back with extracted metadata
print_diag("Asking kernel to call us back for each scan row (file to read)\n");
visit_scan_data(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
visit_scan_metadata(engine_data, selection_vec, transforms, engine_context, scan_row_callback);
free_bool_slice(selection_vec);
free_engine_data(engine_data);
}
Expand Down Expand Up @@ -291,7 +292,7 @@ int main(int argc, char* argv[])
#endif
};

ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_data_init(engine, scan);
ExternResultHandleSharedScanDataIterator data_iter_res = kernel_scan_metadata_init(engine, scan);
if (data_iter_res.tag != OkHandleSharedScanDataIterator) {
print_error("Failed to construct scan data iterator.", (Error*)data_iter_res.err);
free_error((Error*)data_iter_res.err);
Expand All @@ -304,7 +305,8 @@ int main(int argc, char* argv[])

// iterate scan files
for (;;) {
ExternResultbool ok_res = kernel_scan_data_next(data_iter, &context, do_visit_scan_data);
ExternResultbool ok_res =
kernel_scan_metadata_next(data_iter, &context, do_visit_scan_metadata);
if (ok_res.tag != Okbool) {
print_error("Failed to iterate scan data.", (Error*)ok_res.err);
free_error((Error*)ok_res.err);
Expand All @@ -323,7 +325,7 @@ int main(int argc, char* argv[])
context.arrow_context = NULL;
#endif

free_kernel_scan_data(data_iter);
free_scan_metadata(data_iter);
free_scan(scan);
free_schema(logical_schema);
free_schema(read_schema);
Expand Down
8 changes: 4 additions & 4 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,11 +685,11 @@ pub struct StringSliceIterator;

/// # Safety
///
/// The iterator must be valid (returned by [`kernel_scan_data_init`]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [`kernel_scan_metadata_init`]) and not yet freed by
/// [`free_scan_metadata`]. The visitor function pointer must be non-null.
///
/// [`kernel_scan_data_init`]: crate::scan::kernel_scan_data_init
/// [`free_kernel_scan_data`]: crate::scan::free_kernel_scan_data
/// [`kernel_scan_metadata_init`]: crate::scan::kernel_scan_metadata_init
/// [`free_scan_metadata`]: crate::scan::free_scan_metadata
#[no_mangle]
pub unsafe extern "C" fn string_slice_next(
data: Handle<StringSliceIterator>,
Expand Down
38 changes: 19 additions & 19 deletions ffi/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
use delta_kernel::scan::{Scan, ScanData};
use delta_kernel::scan::{Scan, ScanMetadata};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::{DeltaResult, Error, Expression, ExpressionRef};
use delta_kernel_ffi_macros::handle_descriptor;
Expand All @@ -24,7 +24,7 @@ use crate::{
use super::handle::Handle;

// TODO: Why do we even need to expose a scan, when the only thing an engine can do with it is
// handit back to the kernel by calling `kernel_scan_data_init`? There isn't even an FFI method to
// handit back to the kernel by calling `kernel_scan_metadata_init`? There isn't even an FFI method to
// drop it!
#[handle_descriptor(target=Scan, mutable=false, sized=true)]
pub struct SharedScan;
Expand Down Expand Up @@ -129,7 +129,7 @@ pub struct KernelScanDataIterator {
// Mutex -> Allow the iterator to be accessed safely by multiple threads.
// Box -> Wrap its unsized content this struct is fixed-size with thin pointers.
// Item = DeltaResult<ScanData>
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanData>> + Send>>,
data: Mutex<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>> + Send>>,

// Also keep a reference to the external engine for its error allocator. The default Parquet and
// Json handlers don't hold any reference to the tokio reactor they rely on, so the iterator
Expand All @@ -147,29 +147,29 @@ impl Drop for KernelScanDataIterator {
}

/// Get an iterator over the data needed to perform a scan. This will return a
/// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_data_next`] to get the actual
/// [`KernelScanDataIterator`] which can be passed to [`kernel_scan_metadata_next`] to get the actual
/// data in the iterator.
///
/// # Safety
///
/// Engine is responsible for passing a valid [`SharedExternEngine`] and [`SharedScan`]
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_init(
pub unsafe extern "C" fn kernel_scan_metadata_init(
engine: Handle<SharedExternEngine>,
scan: Handle<SharedScan>,
) -> ExternResult<Handle<SharedScanDataIterator>> {
let engine = unsafe { engine.clone_as_arc() };
let scan = unsafe { scan.as_ref() };
kernel_scan_data_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
kernel_scan_metadata_init_impl(&engine, scan).into_extern_result(&engine.as_ref())
}

fn kernel_scan_data_init_impl(
fn kernel_scan_metadata_init_impl(
engine: &Arc<dyn ExternEngine>,
scan: &Scan,
) -> DeltaResult<Handle<SharedScanDataIterator>> {
let scan_data = scan.scan_data(engine.engine().as_ref())?;
let scan_metadata = scan.scan_metadata(engine.engine().as_ref())?;
let data = KernelScanDataIterator {
data: Mutex::new(Box::new(scan_data)),
data: Mutex::new(Box::new(scan_metadata)),
engine: engine.clone(),
};
Ok(Arc::new(data).into())
Expand All @@ -181,13 +181,13 @@ fn kernel_scan_data_init_impl(
///
/// # Safety
///
/// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by
/// [`free_kernel_scan_data`]. The visitor function pointer must be non-null.
/// The iterator must be valid (returned by [kernel_scan_metadata_init]) and not yet freed by
/// [`free_scan_metadata`]. The visitor function pointer must be non-null.
///
/// [`free_bool_slice`]: crate::free_bool_slice
/// [`free_engine_data`]: crate::free_engine_data
#[no_mangle]
pub unsafe extern "C" fn kernel_scan_data_next(
pub unsafe extern "C" fn kernel_scan_metadata_next(
data: Handle<SharedScanDataIterator>,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
Expand All @@ -198,10 +198,10 @@ pub unsafe extern "C" fn kernel_scan_data_next(
),
) -> ExternResult<bool> {
let data = unsafe { data.as_ref() };
kernel_scan_data_next_impl(data, engine_context, engine_visitor)
kernel_scan_metadata_next_impl(data, engine_context, engine_visitor)
.into_extern_result(&data.engine.as_ref())
}
fn kernel_scan_data_next_impl(
fn kernel_scan_metadata_next_impl(
data: &KernelScanDataIterator,
engine_context: NullableCvoid,
engine_visitor: extern "C" fn(
Expand All @@ -228,11 +228,11 @@ fn kernel_scan_data_next_impl(
/// # Safety
///
/// Caller is responsible for (at most once) passing a valid pointer returned by a call to
/// [`kernel_scan_data_init`].
/// [`kernel_scan_metadata_init`].
// we should probably be consistent with drop vs. free on engine side (probably the latter is more
// intuitive to non-rust code)
#[no_mangle]
pub unsafe extern "C" fn free_kernel_scan_data(data: Handle<SharedScanDataIterator>) {
pub unsafe extern "C" fn free_scan_metadata(data: Handle<SharedScanDataIterator>) {
data.drop_handle();
}

Expand Down Expand Up @@ -304,7 +304,7 @@ pub unsafe extern "C" fn get_from_string_map(
/// `get_transform_for_row` returns `NULL` no expression need be applied and the data read from disk
/// is already in the correct logical state.
///
/// NB: If you are using `visit_scan_data` you don't need to worry about dealing with probing
/// NB: If you are using `visit_scan_metadata` you don't need to worry about dealing with probing
/// `CTransforms`. The callback will be invoked with the correct transform for you.
pub struct CTransforms {
transforms: Vec<Option<ExpressionRef>>,
Expand Down Expand Up @@ -420,13 +420,13 @@ struct ContextWrapper {
callback: CScanCallback,
}

/// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan
/// Shim for ffi to call visit_scan_metadata. This will generally be called when iterating through scan
/// data which provides the data handle and selection vector as each element in the iterator.
///
/// # Safety
/// engine is responsible for passing a valid [`ExclusiveEngineData`] and selection vector.
#[no_mangle]
pub unsafe extern "C" fn visit_scan_data(
pub unsafe extern "C" fn visit_scan_metadata(
data: Handle<ExclusiveEngineData>,
selection_vec: KernelBoolSlice,
transforms: &CTransforms,
Expand Down
8 changes: 4 additions & 4 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum Commands {
/// Show the table's schema
Schema,
/// Show the meta-data that would be used to scan the table
ScanData,
ScanMetadata,
/// Show each action from the log-segments
Actions {
/// Show the log in reverse order (default is log replay order -- newest first)
Expand Down Expand Up @@ -207,10 +207,10 @@ fn try_main() -> DeltaResult<()> {
Commands::Schema => {
println!("{:#?}", snapshot.schema());
}
Commands::ScanData => {
Commands::ScanMetadata => {
let scan = ScanBuilder::new(snapshot).build()?;
let scan_data = scan.scan_data(&engine)?;
for res in scan_data {
let scan_metadata = scan.scan_metadata(&engine)?;
for res in scan_metadata {
let (data, vector, transforms) = res?;
delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ Read Table Multi-Threaded

# About
This example shows a program that reads a table using multiple threads. This shows the use of the
`scan_data`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
`scan_metadata`, `global_scan_state`, and `visit_scan_files` methods, that can be used to partition work
to either multiple threads, or workers (in the case of a distributed engine).

You can run this from the same directory as this `README.md` by running `cargo run -- [args]`.
Expand Down Expand Up @@ -49,4 +49,4 @@ To select specific columns you need a `--` after the column list specification.

- Read `letter` and `data` columns from the `multi_partitioned` dat table:

`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
`cargo run -- --columns letter,data -- ../../../acceptance/tests/dat/out/reader_tests/generated/multi_partitioned/delta/`
6 changes: 3 additions & 3 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use clap::{Parser, ValueEnum};
use url::Url;

/// An example program that reads a table using multiple threads. This shows the use of the
/// scan_data and global_scan_state methods on a Scan, that can be used to partition work to either
/// scan_metadata and global_scan_state methods on a Scan, that can be used to partition work to either
/// multiple threads, or workers (in the case of a distributed engine).
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -179,7 +179,7 @@ fn try_main() -> DeltaResult<()> {
// [`delta_kernel::scan::scan_row_schema`]. Generally engines will not need to interact with
// this data directly, and can just call [`visit_scan_files`] to get pre-parsed data back from
// the kernel.
let scan_data = scan.scan_data(engine.as_ref())?;
let scan_metadata = scan.scan_metadata(engine.as_ref())?;

// get any global state associated with this scan
let global_state = Arc::new(scan.global_scan_state());
Expand Down Expand Up @@ -209,7 +209,7 @@ fn try_main() -> DeltaResult<()> {
// done sending
drop(record_batch_tx);

for res in scan_data {
for res in scan_metadata {
let (data, vector, transforms) = res?;
scan_file_tx = delta_kernel::scan::state::visit_scan_files(
data.as_ref(),
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::{Arc, LazyLock};
use itertools::Itertools;

use super::data_skipping::DataSkippingFilter;
use super::{ScanData, Transform};
use super::{ScanMetadata, Transform};
use crate::actions::get_log_add_schema;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::expressions::{column_expr, column_name, ColumnName, Expression, ExpressionRef};
Expand Down Expand Up @@ -338,7 +338,7 @@ fn get_add_transform_expr() -> Expression {
}

impl LogReplayProcessor for ScanLogReplayProcessor {
type Output = ScanData;
type Output = ScanMetadata;

fn process_actions_batch(
&mut self,
Expand Down Expand Up @@ -385,7 +385,7 @@ pub(crate) fn scan_action_iter(
logical_schema: SchemaRef,
transform: Option<Arc<Transform>>,
physical_predicate: Option<(ExpressionRef, SchemaRef)>,
) -> impl Iterator<Item = DeltaResult<ScanData>> {
) -> impl Iterator<Item = DeltaResult<ScanMetadata>> {
ScanLogReplayProcessor::new(engine, physical_predicate, logical_schema, transform)
.process_actions_iter(action_iter)
}
Expand Down
Loading
Loading