Skip to content

Commit

Permalink
factor out and make a new transform_to_logical
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklan committed Jan 10, 2025
1 parent 83fc8eb commit ec0d671
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 43 deletions.
25 changes: 10 additions & 15 deletions kernel/examples/read-table-multi-threaded/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::engine::sync::SyncEngine;
use delta_kernel::scan::state::{DvInfo, GlobalScanState, Stats};
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats};
use delta_kernel::schema::Schema;
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};

Expand Down Expand Up @@ -293,20 +293,15 @@ fn do_work(
for read_result in read_results {
let read_result = read_result.unwrap();
let len = read_result.len();
// to transform the physical data into the correct logical form
let logical = if let Some(ref transform) = scan_file.transform {
engine
.get_expression_handler()
.get_evaluator(
scan_state.physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
scan_state.logical_schema.clone().into(),
)
.evaluate(read_result.as_ref())
.unwrap()
} else {
read_result
};
// transform the physical data into the correct logical form
let logical = transform_to_logical(
engine,
read_result,
&scan_state.physical_schema,
&scan_state.logical_schema,
&scan_file.transform,
)
.unwrap();

let record_batch = to_arrow(logical).unwrap();

Expand Down
21 changes: 8 additions & 13 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,19 +559,14 @@ impl Scan {
let global_state = global_state.clone();
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
let read_result = read_result?;
// to transform the physical data into the correct logical form
let logical = if let Some(ref transform) = scan_file.transform {
engine
.get_expression_handler()
.get_evaluator(
global_state.physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
global_state.logical_schema.clone().into(),
)
.evaluate(read_result.as_ref())
} else {
Ok(read_result)
};
// transform the physical data into the correct logical form
let logical = state::transform_to_logical(
engine.as_ref(),
read_result,
&global_state.physical_schema,
&global_state.logical_schema,
&scan_file.transform,
);
let len = logical.as_ref().map_or(0, |res| res.len());
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
// will cover the following results. we `take()` out of `selection_vector` to avoid
Expand Down
24 changes: 24 additions & 0 deletions kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::LazyLock;

use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::scan::get_transform_for_row;
use crate::schema::Schema;
use crate::utils::require;
use crate::ExpressionRef;
use crate::{
Expand Down Expand Up @@ -100,6 +101,29 @@ impl DvInfo {
}
}

/// utility function for applying a transform expression to convert data from physical to logical
/// format
pub fn transform_to_logical(
engine: &dyn Engine,
physical_data: Box<dyn EngineData>,
physical_schema: &SchemaRef,
logical_schema: &Schema,
transform: &Option<ExpressionRef>,
) -> DeltaResult<Box<dyn EngineData>> {
if let Some(ref transform) = transform {
engine
.get_expression_handler()
.get_evaluator(
physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
logical_schema.clone().into(),
)
.evaluate(physical_data.as_ref())
} else {
Ok(physical_data)
}
}

pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
Expand Down
24 changes: 9 additions & 15 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::expressions::{column_expr, BinaryOperator, Expression, ExpressionRef};
use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::state::{transform_to_logical, visit_scan_files, DvInfo, Stats};
use delta_kernel::scan::Scan;
use delta_kernel::schema::{DataType, Schema};
use delta_kernel::{Engine, FileMeta, Table};
Expand Down Expand Up @@ -405,20 +405,14 @@ fn read_with_scan_data(
let read_result = read_result.unwrap();
let len = read_result.len();
// to transform the physical data into the correct logical form
let logical = if let Some(ref transform) = scan_file.transform {
engine
.get_expression_handler()
.get_evaluator(
global_state.physical_schema.clone(),
transform.as_ref().clone(), // TODO: Maybe eval should take a ref
global_state.logical_schema.clone().into(),
)
.evaluate(read_result.as_ref())
.unwrap()
} else {
read_result
};

let logical = transform_to_logical(
engine,
read_result,
&global_state.physical_schema,
&global_state.logical_schema,
&scan_file.transform,
)
.unwrap();
let record_batch = to_arrow(logical).unwrap();
let rest = split_vector(selection_vector.as_mut(), len, Some(true));
let batch = if let Some(mask) = selection_vector.clone() {
Expand Down

0 comments on commit ec0d671

Please sign in to comment.