Skip to content

Commit f4c6a29

Browse files
test fixes part1: use sync engine in kernel, otherwise just use default. misc fixes
1 parent 41ff43f commit f4c6a29

File tree

13 files changed

+89
-70
lines changed

13 files changed

+89
-70
lines changed

delta-kernel-engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,5 @@ arrow-55 = [ "dep:arrow_55", "dep:parquet_55", "dep:object_store_55" ]
7272
rustls = [ "reqwest/rustls-tls-native-roots", "reqwest/http2" ]
7373

7474
[dev-dependencies]
75+
test_utils = { path = "../test-utils" }
7576
async-trait = "0.1" # only used for our custom SlowGetStore ObjectStore implementation

delta-kernel-engine/src/arrow_expression/tests.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ use crate::arrow::buffer::{OffsetBuffer, ScalarBuffer};
88
use crate::arrow::datatypes::{DataType, Field, Fields, Schema};
99

1010
use super::*;
11-
use crate::expressions::*;
11+
use delta_kernel::expressions::*;
12+
use delta_kernel::schema::DataType as DeltaDataTypes;
1213
use delta_kernel::schema::{ArrayType, MapType, StructField, StructType};
13-
use delta_kernel::DataType as DeltaDataTypes;
1414
use delta_kernel::EvaluationHandlerExtension as _;
1515

1616
use Expression as Expr;
@@ -139,7 +139,7 @@ fn test_literal_complex_type_array() {
139139
];
140140
let struct_type = StructType::new(struct_fields.clone());
141141
let struct_value = Scalar::Struct(
142-
crate::expressions::StructData::try_new(
142+
StructData::try_new(
143143
struct_fields.clone(),
144144
vec![
145145
Scalar::Integer(42),
@@ -457,11 +457,11 @@ fn test_null_row() {
457457
StructField::nullable(
458458
"x",
459459
StructType::new([
460-
StructField::nullable("a", crate::schema::DataType::INTEGER),
461-
StructField::not_null("b", crate::schema::DataType::STRING),
460+
StructField::nullable("a", DeltaDataTypes::INTEGER),
461+
StructField::not_null("b", DeltaDataTypes::STRING),
462462
]),
463463
),
464-
StructField::nullable("c", crate::schema::DataType::STRING),
464+
StructField::nullable("c", DeltaDataTypes::STRING),
465465
]));
466466
let handler = ArrowEvaluationHandler;
467467
let result = handler.null_row(schema.clone()).unwrap();
@@ -492,7 +492,7 @@ fn test_null_row() {
492492
fn test_null_row_err() {
493493
let not_null_schema = Arc::new(StructType::new(vec![StructField::not_null(
494494
"a",
495-
crate::schema::DataType::STRING,
495+
DeltaDataTypes::STRING,
496496
)]));
497497
let handler = ArrowEvaluationHandler;
498498
assert!(handler.null_row(not_null_schema).is_err());

delta-kernel-engine/src/default/parquet.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,8 @@ mod tests {
405405

406406
use crate::arrow_data::ArrowEngineData;
407407
use crate::default::executor::tokio::TokioBackgroundExecutor;
408-
use crate::EngineData;
408+
409+
use delta_kernel::{transaction, EngineData};
409410

410411
use itertools::Itertools;
411412

@@ -472,7 +473,7 @@ mod tests {
472473
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();
473474

474475
let schema = Arc::new(
475-
crate::transaction::get_write_metadata_schema()
476+
transaction::get_write_metadata_schema()
476477
.as_ref()
477478
.try_into()
478479
.unwrap(),

delta-kernel-engine/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ pub mod parquet_row_group_skipping;
1010
mod arrow_compat;
1111
pub use arrow_compat::*;
1212

13-
pub(crate) mod arrow_conversion;
13+
pub mod arrow_conversion;
14+
1415
pub(crate) mod arrow_get_data;
1516
pub(crate) mod arrow_utils;
1617
pub(crate) mod ensure_data_types;
@@ -36,7 +37,7 @@ mod tests {
3637
use crate::arrow::array::{RecordBatch, StringArray};
3738
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
3839
use crate::arrow_data::ArrowEngineData;
39-
use crate::{Engine, EngineData};
40+
use delta_kernel::{Engine, EngineData};
4041

4142
use test_utils::delta_path_for_version;
4243

kernel/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ rustc_version = "0.4.1"
6363
delta-kernel-engine = { path = "../delta-kernel-engine", features = ["arrow-55"] }
6464
test_utils = { path = "../test-utils" }
6565
# Used for testing parse_url_opts extensibility
66-
hdfs-native-object-store = { version = "0.14.0" }
66+
hdfs-native-object-store = { version = "0.14.0", features = ["integration-test"] }
6767
hdfs-native = "0.11.1"
6868
walkdir = { version = "2.5.0" }
6969
paste = "1.0"
@@ -75,3 +75,6 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
7575
"env-filter",
7676
"fmt",
7777
] }
78+
# TODO: remove these - only used for write.rs example
79+
tokio = { version = "1", features = ["full"] }
80+
futures = "0.3"

kernel/src/engine/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
//! A simple, single threaded, [`Engine`] that can only read from the local filesystem
22
3-
use super::arrow_expression::ArrowEvaluationHandler;
4-
use crate::engine::arrow_data::ArrowEngineData;
53
use crate::{
64
DeltaResult, Engine, Error, EvaluationHandler, FileDataReadResultIterator, FileMeta,
75
JsonHandler, ParquetHandler, PredicateRef, SchemaRef, StorageHandler,
86
};
7+
use delta_kernel_engine::arrow_conversion::something;
8+
use delta_kernel_engine::arrow_data::ArrowEngineData;
9+
use delta_kernel_engine::arrow_expression::ArrowEvaluationHandler;
910

10-
use crate::arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
11+
use delta_kernel_engine::arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
1112
use itertools::Itertools;
1213
use std::fs::File;
1314
use std::sync::Arc;
@@ -19,7 +20,7 @@ mod storage;
1920

2021
/// This is a simple implementation of [`Engine`]. It only supports reading data from the local
2122
/// filesystem, and internally represents data using `Arrow`.
22-
pub struct SyncEngine {
23+
pub(crate) struct SyncEngine {
2324
storage_handler: Arc<storage::SyncStorageHandler>,
2425
json_handler: Arc<json::SyncJsonHandler>,
2526
parquet_handler: Arc<parquet::SyncParquetHandler>,
@@ -101,7 +102,7 @@ where
101102
#[cfg(test)]
102103
mod tests {
103104
use super::*;
104-
use crate::engine::tests::test_arrow_engine;
105+
use delta_kernel_engine::tests::test_arrow_engine;
105106

106107
#[test]
107108
fn test_sync_engine() {

kernel/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ pub trait EvaluationHandler: AsAny {
405405
/// EvaluationHandlers.
406406
// For some reason rustc doesn't detect it's usage so we allow(dead_code) here...
407407
#[allow(dead_code)]
408-
trait EvaluationHandlerExtension: EvaluationHandler {
408+
pub trait EvaluationHandlerExtension: EvaluationHandler {
409409
/// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
410410
/// `values`.
411411
// Note: we will stick with a Schema instead of DataType (more constrained can expand in

kernel/src/table.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl Table {
103103
/// Creates a [`CheckpointWriter`] for generating checkpoints at the specified table version.
104104
///
105105
/// See the [`crate::checkpoint`] module documentation for more details on checkpoint types
106-
/// and the overall checkpoint process.
106+
/// and the overall checkpoint process.
107107
///
108108
/// # Parameters
109109
/// - `engine`: Implementation of [`Engine`] apis.
@@ -165,7 +165,7 @@ mod tests {
165165
use std::path::PathBuf;
166166

167167
use super::*;
168-
use crate::engine::sync::SyncEngine;
168+
use crate::engine::SyncEngine;
169169

170170
#[test]
171171
fn test_table() {

kernel/tests/common/mod.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
use delta_kernel::arrow::compute::filter_record_batch;
2-
use delta_kernel::arrow::record_batch::RecordBatch;
3-
use delta_kernel::arrow::util::pretty::pretty_format_batches;
1+
use delta_kernel_engine::arrow::compute::filter_record_batch;
2+
use delta_kernel_engine::arrow::record_batch::RecordBatch;
3+
use delta_kernel_engine::arrow::util::pretty::pretty_format_batches;
4+
45
use itertools::Itertools;
56

6-
use crate::ArrowEngineData;
77
use delta_kernel::scan::Scan;
88
use delta_kernel::{DeltaResult, Engine, EngineData, Table};
9+
use delta_kernel_engine::arrow_data::ArrowEngineData;
910

1011
use std::sync::Arc;
1112

@@ -24,7 +25,7 @@ macro_rules! sort_lines {
2425
#[macro_export]
2526
macro_rules! assert_batches_sorted_eq {
2627
($expected_lines_sorted: expr, $CHUNKS: expr) => {
27-
let formatted = delta_kernel::arrow::util::pretty::pretty_format_batches($CHUNKS)
28+
let formatted = delta_kernel_engine::arrow::util::pretty::pretty_format_batches($CHUNKS)
2829
.unwrap()
2930
.to_string();
3031
// fix for windows: \r\n -->
@@ -95,7 +96,11 @@ pub(crate) fn read_scan(scan: &Scan, engine: Arc<dyn Engine>) -> DeltaResult<Vec
9596
let data = scan_result.raw_data?;
9697
let record_batch = to_arrow(data)?;
9798
if let Some(mask) = mask {
98-
Ok(filter_record_batch(&record_batch, &mask.into())?)
99+
Ok(
100+
filter_record_batch(&record_batch, &mask.into()).map_err(|e| {
101+
delta_kernel::Error::Generic(format!("Failed to filter record batch: {e}"))
102+
})?,
103+
)
99104
} else {
100105
Ok(record_batch)
101106
}

kernel/tests/dv.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::ops::Add;
44
use std::path::PathBuf;
55
use std::sync::Arc;
66

7-
use delta_kernel::engine::sync::SyncEngine;
7+
use delta_kernel::engine::SyncEngine;
88
use delta_kernel::scan::ScanResult;
99
use delta_kernel::{DeltaResult, Table};
1010

kernel/tests/hdfs.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
//
66
// Run these integration tests with:
77
// cargo test --features integration-test --test hdfs
8-
#![cfg(all(feature = "integration-test", not(target_os = "windows")))]
8+
#![cfg(not(target_os = "windows"))]
99

10-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
11-
use delta_kernel::engine::default::DefaultEngine;
1210
use delta_kernel::Table;
11+
use delta_kernel_engine::default::executor::tokio::TokioBackgroundExecutor;
12+
use delta_kernel_engine::default::DefaultEngine;
1313
use hdfs_native::{Client, WriteOptions};
1414
use hdfs_native_object_store::minidfs::MiniDfs;
1515
use std::collections::HashSet;
@@ -19,6 +19,7 @@ use std::sync::Arc;
1919
extern crate walkdir;
2020
use walkdir::WalkDir;
2121

22+
#[allow(unused)]
2223
async fn write_local_path_to_hdfs(
2324
local_path: &Path,
2425
remote_path: &Path,
@@ -51,7 +52,8 @@ async fn write_local_path_to_hdfs(
5152
Ok(())
5253
}
5354

54-
#[tokio::test]
55+
#[ignore]
56+
#[allow(unused)]
5557
async fn read_table_version_hdfs() -> Result<(), Box<dyn std::error::Error>> {
5658
let minidfs = MiniDfs::with_features(&HashSet::new());
5759
let hdfs_client = Client::default();

kernel/tests/read.rs

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,24 @@ use std::collections::HashMap;
22
use std::path::PathBuf;
33
use std::sync::Arc;
44

5+
use delta_kernel_engine::arrow::compute::{concat_batches, filter_record_batch};
6+
use delta_kernel_engine::arrow::datatypes::SchemaRef as ArrowSchemaRef;
7+
use delta_kernel_engine::object_store::{memory::InMemory, path::Path, ObjectStore};
8+
use delta_kernel_engine::parquet::file::properties::{EnabledStatistics, WriterProperties};
9+
510
use delta_kernel::actions::deletion_vector::split_vector;
6-
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
7-
use delta_kernel::arrow::datatypes::SchemaRef as ArrowSchemaRef;
8-
use delta_kernel::engine::arrow_data::ArrowEngineData;
9-
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
10-
use delta_kernel::engine::default::DefaultEngine;
1111
use delta_kernel::expressions::{
1212
column_expr, column_pred, BinaryPredicateOp, Expression as Expr, ExpressionRef,
1313
Predicate as Pred,
1414
};
15-
use delta_kernel::object_store::{memory::InMemory, path::Path, ObjectStore};
16-
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
1715
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
1816
use delta_kernel::scan::Scan;
1917
use delta_kernel::schema::{DataType, Schema};
2018
use delta_kernel::{Engine, FileMeta, Table};
19+
use delta_kernel_engine::arrow_conversion::arrow_schema_from_struct_type;
20+
use delta_kernel_engine::default::executor::tokio::TokioBackgroundExecutor;
21+
use delta_kernel_engine::default::DefaultEngine;
22+
2123
use itertools::Itertools;
2224
use test_utils::{
2325
actions_to_string, add_commit, generate_batch, generate_simple_batch, into_record_batch,
@@ -316,7 +318,8 @@ fn read_with_execute(
316318
scan: &Scan,
317319
expected: &[String],
318320
) -> Result<(), Box<dyn std::error::Error>> {
319-
let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?);
321+
let result_schema: ArrowSchemaRef =
322+
Arc::new(arrow_schema_from_struct_type(scan.schema().as_ref())?);
320323
let batches = read_scan(scan, engine)?;
321324

322325
if expected.is_empty() {
@@ -359,7 +362,8 @@ fn read_with_scan_metadata(
359362
expected: &[String],
360363
) -> Result<(), Box<dyn std::error::Error>> {
361364
let global_state = scan.global_scan_state();
362-
let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?);
365+
let result_schema: ArrowSchemaRef =
366+
Arc::new(arrow_schema_from_struct_type(scan.schema().as_ref())?);
363367
let scan_metadata = scan.scan_metadata(engine)?;
364368
let mut scan_files = vec![];
365369
for res in scan_metadata {
@@ -431,36 +435,32 @@ fn read_table_data(
431435
let path = std::fs::canonicalize(PathBuf::from(path))?;
432436
let predicate = predicate.map(Arc::new);
433437
let url = url::Url::from_directory_path(path).unwrap();
434-
let default_engine = DefaultEngine::try_new(
438+
let default_engine = Arc::new(DefaultEngine::try_new(
435439
&url,
436440
std::iter::empty::<(&str, &str)>(),
437441
Arc::new(TokioBackgroundExecutor::new()),
438-
)?;
439-
let sync_engine = delta_kernel::engine::sync::SyncEngine::new();
440-
441-
let engines: Vec<Arc<dyn Engine>> = vec![Arc::new(sync_engine), Arc::new(default_engine)];
442-
for engine in engines {
443-
let table = Table::new(url.clone());
444-
let snapshot = table.snapshot(engine.as_ref(), None)?;
445-
446-
let read_schema = select_cols.map(|select_cols| {
447-
let table_schema = snapshot.schema();
448-
let selected_fields = select_cols
449-
.iter()
450-
.map(|col| table_schema.field(col).cloned().unwrap());
451-
Arc::new(Schema::new(selected_fields))
452-
});
453-
println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}");
454-
let scan = snapshot
455-
.into_scan_builder()
456-
.with_schema_opt(read_schema)
457-
.with_predicate(predicate.clone())
458-
.build()?;
442+
)?);
459443

460-
sort_lines!(expected);
461-
read_with_scan_metadata(table.location(), engine.as_ref(), &scan, &expected)?;
462-
read_with_execute(engine, &scan, &expected)?;
463-
}
444+
let table = Table::new(url.clone());
445+
let snapshot = table.snapshot(default_engine.as_ref(), None)?;
446+
447+
let read_schema = select_cols.map(|select_cols| {
448+
let table_schema = snapshot.schema();
449+
let selected_fields = select_cols
450+
.iter()
451+
.map(|col| table_schema.field(col).cloned().unwrap());
452+
Arc::new(Schema::new(selected_fields))
453+
});
454+
println!("Read {url:?} with schema {read_schema:#?} and predicate {predicate:#?}");
455+
let scan = snapshot
456+
.into_scan_builder()
457+
.with_schema_opt(read_schema)
458+
.with_predicate(predicate.clone())
459+
.build()?;
460+
461+
sort_lines!(expected);
462+
read_with_scan_metadata(table.location(), default_engine.as_ref(), &scan, &expected)?;
463+
read_with_execute(default_engine, &scan, &expected)?;
464464
Ok(())
465465
}
466466

kernel/tests/v2_checkpoints.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::sync::Arc;
22

3-
use delta_kernel::arrow::array::RecordBatch;
4-
use delta_kernel::engine::sync::SyncEngine;
3+
use delta_kernel_engine::arrow::array::RecordBatch;
4+
use delta_kernel_engine::default::executor::tokio::TokioBackgroundExecutor;
5+
use delta_kernel_engine::default::DefaultEngine;
6+
use delta_kernel_engine::object_store::local::LocalFileSystem;
57

6-
use delta_kernel::engine::arrow_data::ArrowEngineData;
78
use delta_kernel::{DeltaResult, Table};
89

910
mod common;
@@ -15,7 +16,11 @@ fn read_v2_checkpoint_table(test_name: impl AsRef<str>) -> DeltaResult<Vec<Recor
1516
let test_path = test_dir.path().join(test_name.as_ref());
1617

1718
let table = Table::try_from_uri(test_path.to_str().expect("table path to string")).unwrap();
18-
let engine = Arc::new(SyncEngine::new());
19+
let object_store = Arc::new(LocalFileSystem::new());
20+
let engine = Arc::new(DefaultEngine::new(
21+
object_store,
22+
TokioBackgroundExecutor::new().into(),
23+
));
1924
let snapshot = table.snapshot(engine.as_ref(), None)?;
2025
let scan = snapshot.into_scan_builder().build()?;
2126
let batches = read_scan(&scan, engine)?;

0 commit comments

Comments
 (0)