Skip to content

Commit 50028d2

Browse files
refactor!: make SyncEngine test-only, use DefaultEngine everywhere else (#957)
## What changes are proposed in this pull request? Again prompted by the crate split exploration (#941). Notice that the `SyncEngine` is not really useful to external users, instead just adding to our public API and complicating our feature flags. We expect users to reach for the default engine in the case they want a 'prebuilt' solution with minimal deps and sane defaults (arrow, tokio). The `SyncEngine` will continue to exist as a test-only engine (for now; and will maybe evolve in the future). This PR includes 1. removing `sync-engine` feature flag (in core kernel and FFI), and making the existing `engine::sync` module `#[cfg(test)]` 2. removing any consumption of the sync engine in (1) examples and (2) integration tests 3. adding a simple test_util trait `DefaultEngineExtension ` and a simple method to return a new local default engine (to more easily mimic SyncEngine API for easy replacement) ### This PR affects the following public APIs - no more `SyncEngine`, no more `sync-engine` feature flag ## How was this change tested? Existing
1 parent e614153 commit 50028d2

File tree

24 files changed

+157
-217
lines changed

24 files changed

+157
-217
lines changed

.github/workflows/build.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ jobs:
178178
cargo build
179179
popd
180180
pushd ffi
181-
cargo b --features default-engine,sync-engine,test-ffi,tracing
181+
cargo b --features default-engine,test-ffi,tracing
182182
popd
183183
- name: build and run read-table test
184184
run: |
@@ -210,7 +210,7 @@ jobs:
210210
- name: Test with Miri
211211
run: |
212212
pushd ffi
213-
MIRIFLAGS=-Zmiri-disable-isolation cargo miri test --features sync-engine,default-engine
213+
MIRIFLAGS=-Zmiri-disable-isolation cargo miri test --features default-engine
214214
215215
coverage:
216216
runs-on: ubuntu-latest

README.md

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ include:
6565
| Feature flag | Description |
6666
| ------------- | ------------- |
6767
| `default-engine` | Turn on the 'default' engine: async, arrow-based `Engine` implementation |
68-
| `sync-engine` | Turn on the 'sync' engine: synchronous, arrow-based `Engine` implementation. Only supports local storage! |
6968
| `arrow-conversion` | Conversion utilities for arrow/kernel schema interoperation |
7069
| `arrow-expression` | Expression system implementation for arrow |
7170

@@ -75,13 +74,13 @@ are still unstable. We therefore may break APIs within minor releases (that is,
7574
we will not break APIs in patch releases (`0.1.0` -> `0.1.1`).
7675

7776
## Arrow versioning
78-
If you enable the `default-engine` or `sync-engine` features, you get an implementation of the
79-
`Engine` trait that uses [Arrow] as its data format.
77+
If you enable the `default-engine` feature, you get an implementation of the `Engine` trait that
78+
uses [Arrow] as its data format.
8079

8180
The [`arrow crate`](https://docs.rs/arrow/latest/arrow/) tends to release new major versions rather
82-
quickly. To enable engines that already integrate arrow to also integrate kernel and not force them
83-
to track a specific version of arrow that kernel depends on, we take as broad dependency on arrow
84-
versions as we can.
81+
frequently. To enable engines that already integrate arrow to also integrate kernel and not force
82+
them to track a specific version of arrow that kernel depends on, we take as broad dependency on
83+
arrow versions as we can.
8584

8685
We allow selecting the version of arrow to use via feature flags. Currently we support the following
8786
flags:
@@ -97,10 +96,6 @@ Note that if more than one `arrow-x` feature is enabled, kernel will use the _hi
9796
specified flag. This also means that if you use `--all-features` you will get the latest version of
9897
arrow that kernel supports.
9998

100-
If you enable at least one of `default-engine`, `sync-engine`, `arrow-conversion`, or
101-
`arrow-expression`, you must enable either `arrow` (latest arrow version) or `arrow-54` or
102-
`arrow-55`.
103-
10499
### Object Store
105100
You may also need to patch the `object_store` version used if the version of `parquet` you depend on
106101
depends on a different version of `object_store`. This can be done by including `object_store` in

ffi/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,5 @@ trybuild = "1.0"
4040
default = ["default-engine"]
4141
default-engine = ["delta_kernel/default-engine", "delta_kernel/arrow"]
4242
tracing = [ "tracing-core", "tracing-subscriber" ]
43-
sync-engine = ["delta_kernel/sync-engine", "delta_kernel/arrow"]
4443
internal-api = []
4544
test-ffi = []

ffi/cbindgen.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ namespace = "ffi"
1111

1212
[defines]
1313
"feature = default-engine" = "DEFINE_DEFAULT_ENGINE"
14-
"feature = sync-engine" = "DEFINE_SYNC_ENGINE"
1514

1615
[export.mangle]
1716
remove_underscores = true

ffi/src/error.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ use crate::{kernel_string_slice, ExternEngine, KernelStringSlice};
88
pub enum KernelError {
99
UnknownError, // catch-all for unrecognized kernel Error types
1010
FFIError, // errors encountered in the code layer that supports FFI
11-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
11+
#[cfg(feature = "default-engine")]
1212
ArrowError,
1313
EngineDataTypeError,
1414
ExtractError,
1515
GenericError,
1616
IOErrorError,
17-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
17+
#[cfg(feature = "default-engine")]
1818
ParquetError,
1919
#[cfg(feature = "default-engine")]
2020
ObjectStoreError,
@@ -62,15 +62,15 @@ impl From<Error> for KernelError {
6262
fn from(e: Error) -> Self {
6363
match e {
6464
// NOTE: By definition, no kernel Error maps to FFIError
65-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
65+
#[cfg(feature = "default-engine")]
6666
Error::Arrow(_) => KernelError::ArrowError,
6767
Error::CheckpointWrite(_) => KernelError::CheckpointWriteError,
6868
Error::EngineDataType(_) => KernelError::EngineDataTypeError,
6969
Error::Extract(..) => KernelError::ExtractError,
7070
Error::Generic(_) => KernelError::GenericError,
7171
Error::GenericError { .. } => KernelError::GenericError,
7272
Error::IOError(_) => KernelError::IOErrorError,
73-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
73+
#[cfg(feature = "default-engine")]
7474
Error::Parquet(_) => KernelError::ParquetError,
7575
#[cfg(feature = "default-engine")]
7676
Error::ObjectStore(_) => KernelError::ObjectStoreError,

ffi/src/lib.rs

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -355,14 +355,14 @@ pub trait ExternEngine: Send + Sync {
355355
#[handle_descriptor(target=dyn ExternEngine, mutable=false)]
356356
pub struct SharedExternEngine;
357357

358-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
358+
#[cfg(feature = "default-engine")]
359359
struct ExternEngineVtable {
360360
// Actual engine instance to use
361361
engine: Arc<dyn Engine>,
362362
allocate_error: AllocateErrorFn,
363363
}
364364

365-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
365+
#[cfg(feature = "default-engine")]
366366
impl Drop for ExternEngineVtable {
367367
fn drop(&mut self) {
368368
debug!("dropping engine interface");
@@ -373,7 +373,7 @@ impl Drop for ExternEngineVtable {
373373
///
374374
/// Kernel doesn't use any threading or concurrency. If engine chooses to do so, engine is
375375
/// responsible for handling any races that could result.
376-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
376+
#[cfg(feature = "default-engine")]
377377
unsafe impl Send for ExternEngineVtable {}
378378

379379
/// # Safety
@@ -385,10 +385,10 @@ unsafe impl Send for ExternEngineVtable {}
385385
/// Basically, by failing to implement these traits, we forbid the engine from being able to declare
386386
/// its thread-safety (because rust assumes it is not threadsafe). By implementing them, we leave it
387387
/// up to the engine to enforce thread safety if engine chooses to use threads at all.
388-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
388+
#[cfg(feature = "default-engine")]
389389
unsafe impl Sync for ExternEngineVtable {}
390390

391-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
391+
#[cfg(feature = "default-engine")]
392392
impl ExternEngine for ExternEngineVtable {
393393
fn engine(&self) -> Arc<dyn Engine> {
394394
self.engine.clone()
@@ -513,18 +513,7 @@ fn get_default_default_engine_impl(
513513
get_default_engine_impl(url?, Default::default(), allocate_error)
514514
}
515515

516-
/// # Safety
517-
///
518-
/// Caller is responsible for passing a valid path pointer.
519-
#[cfg(feature = "sync-engine")]
520-
#[no_mangle]
521-
pub unsafe extern "C" fn get_sync_engine(
522-
allocate_error: AllocateErrorFn,
523-
) -> ExternResult<Handle<SharedExternEngine>> {
524-
get_sync_engine_impl(allocate_error).into_extern_result(&allocate_error)
525-
}
526-
527-
#[cfg(any(feature = "default-engine", feature = "sync-engine"))]
516+
#[cfg(feature = "default-engine")]
528517
fn engine_to_handle(
529518
engine: Arc<dyn Engine>,
530519
allocate_error: AllocateErrorFn,
@@ -552,14 +541,6 @@ fn get_default_engine_impl(
552541
Ok(engine_to_handle(Arc::new(engine?), allocate_error))
553542
}
554543

555-
#[cfg(feature = "sync-engine")]
556-
fn get_sync_engine_impl(
557-
allocate_error: AllocateErrorFn,
558-
) -> DeltaResult<Handle<SharedExternEngine>> {
559-
let engine = delta_kernel::engine::sync::SyncEngine::new();
560-
Ok(engine_to_handle(Arc::new(engine), allocate_error))
561-
}
562-
563544
/// # Safety
564545
///
565546
/// Caller is responsible for passing a valid handle.
@@ -904,14 +885,4 @@ mod tests {
904885
unsafe { free_engine(engine) }
905886
Ok(())
906887
}
907-
908-
#[test]
909-
#[cfg(feature = "sync-engine")]
910-
fn sync_engine() {
911-
let engine = unsafe { get_sync_engine(allocate_err) };
912-
let engine = ok_or_panic(engine);
913-
unsafe {
914-
free_engine(engine);
915-
}
916-
}
917888
}

integration-tests/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ edition = "2021"
66
[workspace]
77

88
[dependencies]
9-
delta_kernel = { path = "../kernel", features = ["default-engine", "sync-engine"] }
9+
delta_kernel = { path = "../kernel", features = ["default-engine"] }

kernel/Cargo.toml

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ uuid = { version = "1.16.0", features = ["v4", "fast-rng"] }
5454
z85 = "3.0.6"
5555

5656
# optional deps
57-
tempfile = { version = "3", optional = true }
5857
futures = { version = "0.3", optional = true }
5958
# Used for fetching direct urls (like pre-signed urls)
6059
reqwest = { version = "0.12.15", default-features = false, optional = true }
@@ -128,16 +127,12 @@ default-engine-rustls = [
128127
"reqwest/rustls-tls-native-roots",
129128
"reqwest/http2",
130129
]
131-
sync-engine = [
132-
"need-arrow",
133-
"tempfile",
134-
]
135130

136131
[build-dependencies]
137132
rustc_version = "0.4.1"
138133

139134
[dev-dependencies]
140-
delta_kernel = { path = ".", features = ["arrow", "default-engine", "sync-engine"] }
135+
delta_kernel = { path = ".", features = ["arrow", "default-engine"] }
141136
test_utils = { path = "../test-utils" }
142137
# Used for testing parse_url_opts extensibility
143138
hdfs-native-object-store = { version = "0.14.0" }

kernel/examples/read-table-multi-threaded/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ clap = { version = "4.5", features = ["derive"] }
1010
delta_kernel = { path = "../../../kernel", features = [
1111
"arrow-55",
1212
"default-engine",
13-
"sync-engine",
1413
"internal-api",
1514
] }
1615
env_logger = "0.11.8"

kernel/examples/read-table-multi-threaded/src/main.rs

Lines changed: 23 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@ use delta_kernel::actions::deletion_vector::split_vector;
1111
use delta_kernel::engine::arrow_data::ArrowEngineData;
1212
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
1313
use delta_kernel::engine::default::DefaultEngine;
14-
use delta_kernel::engine::sync::SyncEngine;
1514
use delta_kernel::scan::state::{transform_to_logical, DvInfo, GlobalScanState, Stats};
1615
use delta_kernel::schema::Schema;
1716
use delta_kernel::{DeltaResult, Engine, EngineData, ExpressionRef, FileMeta, Table};
1817

19-
use clap::{Parser, ValueEnum};
18+
use clap::Parser;
2019
use url::Url;
2120

2221
/// An example program that reads a table using multiple threads. This shows the use of the
@@ -33,10 +32,6 @@ struct Cli {
3332
#[arg(short, long, default_value_t = 2, value_parser = 1..=2048)]
3433
thread_count: i64,
3534

36-
/// Which Engine to use
37-
#[arg(short, long, value_enum, default_value_t = EngineType::Default)]
38-
engine: EngineType,
39-
4035
/// Comma separated list of columns to select
4136
#[arg(long, value_delimiter=',', num_args(0..))]
4237
columns: Option<Vec<String>>,
@@ -56,14 +51,6 @@ struct Cli {
5651
limit: Option<usize>,
5752
}
5853

59-
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
60-
enum EngineType {
61-
/// Use the default, async engine
62-
Default,
63-
/// Use the sync engine (local files only)
64-
Sync,
65-
}
66-
6754
fn main() -> ExitCode {
6855
env_logger::init();
6956
match try_main() {
@@ -129,27 +116,21 @@ fn try_main() -> DeltaResult<()> {
129116
let table = Table::try_from_uri(&cli.path)?;
130117
println!("Reading {}", table.location());
131118

132-
// create the requested engine
133-
let engine: Arc<dyn Engine> = match cli.engine {
134-
EngineType::Default => {
135-
let mut options = if let Some(region) = cli.region {
136-
HashMap::from([("region", region)])
137-
} else {
138-
HashMap::new()
139-
};
140-
if cli.public {
141-
options.insert("skip_signature", "true".to_string());
142-
}
143-
Arc::new(DefaultEngine::try_new(
144-
table.location(),
145-
options,
146-
Arc::new(TokioBackgroundExecutor::new()),
147-
)?)
148-
}
149-
EngineType::Sync => Arc::new(SyncEngine::new()),
119+
let mut options = if let Some(region) = cli.region {
120+
HashMap::from([("region", region)])
121+
} else {
122+
HashMap::new()
150123
};
124+
if cli.public {
125+
options.insert("skip_signature", "true".to_string());
126+
}
127+
let engine = DefaultEngine::try_new(
128+
table.location(),
129+
options,
130+
Arc::new(TokioBackgroundExecutor::new()),
131+
)?;
151132

152-
let snapshot = table.snapshot(engine.as_ref(), None)?;
133+
let snapshot = table.snapshot(&engine, None)?;
153134

154135
// process the columns requested and build a schema from them
155136
let read_schema_opt = cli
@@ -179,7 +160,7 @@ fn try_main() -> DeltaResult<()> {
179160
// [`delta_kernel::scan::scan_row_schema`]. Generally engines will not need to interact with
180161
// this data directly, and can just call [`visit_scan_files`] to get pre-parsed data back from
181162
// the kernel.
182-
let scan_metadata = scan.scan_metadata(engine.as_ref())?;
163+
let scan_metadata = scan.scan_metadata(&engine)?;
183164

184165
// get any global state associated with this scan
185166
let global_state = Arc::new(scan.global_scan_state());
@@ -192,18 +173,17 @@ fn try_main() -> DeltaResult<()> {
192173

193174
// fire up each thread. we don't need the handles as we rely on the channels to indicate when
194175
// things are done
195-
let _handles: Vec<_> = (0..cli.thread_count)
196-
.map(|_| {
176+
thread::scope(|s| {
177+
(0..cli.thread_count).for_each(|_| {
197178
// items that we need to send to the other thread
198179
let scan_state = global_state.clone();
199180
let rb_tx = record_batch_tx.clone();
200181
let scan_file_rx = scan_file_rx.clone();
201-
let engine = engine.clone();
202-
thread::spawn(move || {
203-
do_work(engine, scan_state, rb_tx, scan_file_rx);
204-
})
205-
})
206-
.collect();
182+
s.spawn(|| {
183+
do_work(&engine, scan_state, rb_tx, scan_file_rx);
184+
});
185+
});
186+
});
207187

208188
// have handed out all copies needed, drop so record_batch_rx will exit when the last thread is
209189
// done sending
@@ -244,13 +224,11 @@ fn try_main() -> DeltaResult<()> {
244224

245225
// this is the work each thread does
246226
fn do_work(
247-
engine: Arc<dyn Engine>,
227+
engine: &dyn Engine,
248228
scan_state: Arc<GlobalScanState>,
249229
record_batch_tx: Sender<RecordBatch>,
250230
scan_file_rx: spmc::Receiver<ScanFile>,
251231
) {
252-
// get the type for the function calls
253-
let engine: &dyn Engine = engine.as_ref();
254232
// in a loop, try and get a ScanFile. Note that `recv` will return an `Err` when the other side
255233
// hangs up, which indicates there's no more data to process.
256234
while let Ok(scan_file) = scan_file_rx.recv() {

kernel/examples/read-table-single-threaded/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ clap = { version = "4.5", features = ["derive"] }
1010
delta_kernel = { path = "../../../kernel", features = [
1111
"arrow-55",
1212
"default-engine",
13-
"sync-engine",
1413
"internal-api",
1514
] }
1615
env_logger = "0.11.8"

0 commit comments

Comments
 (0)