diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 8470433d0c..af8f15edda 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -765,7 +765,7 @@ impl Default for ReferenceSet { #[cfg(test)] mod tests { use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; - use object_store::{memory::InMemory, path::Path}; + use object_store::memory::InMemory; use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction}; use super::*; @@ -839,11 +839,7 @@ mod tests { actions_to_string(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new( - storage.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; @@ -872,11 +868,7 @@ mod tests { actions_to_string_partitioned(vec![TestAction::Metadata]), ) .await?; - let engine = DefaultEngine::new( - storage.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); let engine = engine_to_handle(Arc::new(engine), allocate_err); let path = "memory:///"; diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 5606a28d02..21432ed70e 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -14,7 +14,6 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; pub struct ObjectStoreFileSystemClient { inner: Arc, has_ordered_listing: bool, - table_root: Path, task_executor: Arc, readahead: usize, } @@ -23,13 +22,11 @@ impl ObjectStoreFileSystemClient { pub(crate) fn new( store: Arc, has_ordered_listing: bool, - table_root: Path, task_executor: Arc, ) -> Self { Self { inner: store, has_ordered_listing, - table_root, task_executor, readahead: 10, } @@ -47,16 +44,28 @@ impl FileSystemClient for ObjectStoreFileSystemClient { &self, path: &Url, ) -> DeltaResult>>> { - let url = path.clone(); - let offset = Path::from(path.path()); - // TODO properly handle table prefix - let prefix = self.table_root.child("_delta_log"); + // The offset is used for list-after; the prefix is used to restrict the listing to a specific directory. + // Unfortunately, `Path` provides no easy way to check whether a name is directory-like, + // because it strips trailing /, so we're reduced to manually checking the original URL. + let offset = Path::from_url_path(path.path())?; + let prefix = if path.path().ends_with('/') { + offset.clone() + } else { + let mut parts = offset.parts().collect_vec(); + if parts.pop().is_none() { + return Err(Error::Generic(format!( + "Offset path must not be a root directory. Got: '{}'", + path.as_str() + ))); + } + Path::from_iter(parts) + }; let store = self.inner.clone(); // This channel will become the iterator let (sender, receiver) = std::sync::mpsc::sync_channel(4_000); - + let url = path.clone(); self.task_executor.spawn(async move { let mut stream = store.list_with_offset(Some(&prefix), &offset); @@ -192,11 +201,9 @@ mod tests { let mut url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, false, // don't have ordered listing - prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -229,11 +236,10 @@ mod tests { store.put(&name, data.clone().into()).await.unwrap(); let table_root = Url::parse("memory:///").expect("valid url"); - let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path"); - let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); let files: Vec<_> = engine .get_file_system_client() - .list_from(&table_root) + .list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap()) .unwrap() .try_collect() .unwrap(); @@ -260,11 +266,12 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); - let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); let client = engine.get_file_system_client(); - let files = client.list_from(&Url::parse("file://").unwrap()).unwrap(); + let files = client + .list_from(&url.join("_delta_log").unwrap().join("0").unwrap()) + .unwrap(); let mut len = 0; for (file, expected) in files.zip(expected_names.iter()) { assert!( diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 98a9b0dc74..3e0173f956 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -142,7 +142,7 @@ impl JsonHandler for DefaultJsonHandler { let buffer = to_json_bytes(data)?; // Put if absent let store = self.store.clone(); // cheap Arc - let path = Path::from(path.path()); + let path = Path::from_url_path(path.path())?; let path_str = path.to_string(); self.task_executor .block_on(async move { diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index d89cf29cd0..db4588eb00 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -10,7 +10,7 @@ use std::collections::HashMap; use std::sync::Arc; use self::storage::parse_url_opts; -use object_store::{path::Path, DynObjectStore}; +use object_store::DynObjectStore; use url::Url; use self::executor::TaskExecutor; @@ -60,8 +60,8 @@ impl DefaultEngine { V: Into, { // table root is the path of the table in the ObjectStore - let (store, table_root) = parse_url_opts(table_root, options)?; - Ok(Self::new(Arc::new(store), table_root, task_executor)) + let (store, _table_root) = parse_url_opts(table_root, options)?; + Ok(Self::new(Arc::new(store), task_executor)) } /// Create a new [`DefaultEngine`] instance @@ -71,7 +71,7 @@ impl DefaultEngine { /// - `store`: The object store to use. /// - `table_root_path`: The root path of the table within storage. /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. - pub fn new(store: Arc, table_root: Path, task_executor: Arc) -> Self { + pub fn new(store: Arc, task_executor: Arc) -> Self { // HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because // local filesystem doesn't return a sorted list by default. Although the `object_store` // crate explicitly says it _does not_ return a sorted listing, in practice all the cloud @@ -97,7 +97,6 @@ impl DefaultEngine { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), !is_local, - table_root, task_executor.clone(), )), json: Arc::new(DefaultJsonHandler::new( @@ -158,3 +157,20 @@ impl Engine for DefaultEngine { self.parquet.clone() } } + +#[cfg(test)] +mod tests { + use super::executor::tokio::TokioBackgroundExecutor; + use super::*; + use crate::engine::tests::test_arrow_engine; + use object_store::local::LocalFileSystem; + + #[test] + fn test_default_engine() { + let tmp = tempfile::tempdir().unwrap(); + let url = Url::from_directory_path(tmp.path()).unwrap(); + let store = Arc::new(LocalFileSystem::new()); + let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new())); + test_arrow_engine(&engine, &url); + } +} diff --git a/kernel/src/engine/mod.rs b/kernel/src/engine/mod.rs index 8ea07384a0..f62da336f6 100644 --- a/kernel/src/engine/mod.rs +++ b/kernel/src/engine/mod.rs @@ -27,3 +27,80 @@ pub(crate) mod arrow_get_data; pub(crate) mod ensure_data_types; #[cfg(any(feature = "default-engine-base", feature = "sync-engine"))] pub mod parquet_row_group_skipping; + +#[cfg(test)] +mod tests { + use itertools::Itertools; + use object_store::path::Path; + use std::sync::Arc; + use url::Url; + + use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; + use crate::engine::arrow_data::ArrowEngineData; + use crate::{Engine, EngineData}; + + use test_utils::delta_path_for_version; + + fn test_list_from_should_sort_and_filter( + engine: &dyn Engine, + base_url: &Url, + engine_data: impl Fn() -> Box, + ) { + let json = engine.get_json_handler(); + let get_data = || Box::new(std::iter::once(Ok(engine_data()))); + + let expected_names: Vec = (1..4) + .map(|i| delta_path_for_version(i, "json")) + .collect_vec(); + + for i in expected_names.iter().rev() { + let path = base_url.join(i.as_ref()).unwrap(); + json.write_json_file(&path, get_data(), false).unwrap(); + } + let path = base_url.join("other").unwrap(); + json.write_json_file(&path, get_data(), false).unwrap(); + + let fs = engine.get_file_system_client(); + + // list files after an offset + let test_url = base_url.join(expected_names[0].as_ref()).unwrap(); + let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap(); + assert_eq!(files.len(), expected_names.len() - 1); + for (file, expected) in files.iter().zip(expected_names.iter().skip(1)) { + assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap()); + } + + let test_url = base_url + .join(delta_path_for_version(0, "json").as_ref()) + .unwrap(); + let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap(); + assert_eq!(files.len(), expected_names.len()); + + // list files inside a directory / key prefix + let test_url = base_url.join("_delta_log/").unwrap(); + let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap(); + assert_eq!(files.len(), expected_names.len()); + for (file, expected) in files.iter().zip(expected_names.iter()) { + assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap()); + } + } + + fn get_arrow_data() -> Box { + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "dog", + ArrowDataType::Utf8, + true, + )])); + let data = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))], + ) + .unwrap(); + Box::new(ArrowEngineData::new(data)) + } + + pub(crate) fn test_arrow_engine(engine: &dyn Engine, base_url: &Url) { + test_list_from_should_sort_and_filter(engine, base_url, get_arrow_data); + } +} diff --git a/kernel/src/engine/sync/fs_client.rs b/kernel/src/engine/sync/fs_client.rs index 9577b1499b..9c0d1b80df 100644 --- a/kernel/src/engine/sync/fs_client.rs +++ b/kernel/src/engine/sync/fs_client.rs @@ -39,7 +39,7 @@ impl FileSystemClient for SyncFilesystemClient { let all_ents: Vec<_> = std::fs::read_dir(path_to_read)? .filter(|ent_res| { match (ent_res, min_file_name) { - (Ok(ent), Some(min_file_name)) => ent.file_name() >= *min_file_name, + (Ok(ent), Some(min_file_name)) => ent.file_name() > *min_file_name, _ => true, // Keep unfiltered and/or error entries } }) @@ -106,7 +106,7 @@ mod tests { writeln!(f, "null")?; f.flush()?; - let url_path = tmp_dir.path().join(get_json_filename(1)); + let url_path = tmp_dir.path().join(get_json_filename(0)); let url = Url::from_file_path(url_path).unwrap(); let files: Vec<_> = client.list_from(&url)?.try_collect()?; @@ -137,11 +137,11 @@ mod tests { // i+1 in index because we started at 0001 in the listing assert_eq!( file?.location.to_file_path().unwrap().to_str().unwrap(), - expected[i + 1].to_str().unwrap() + expected[i + 2].to_str().unwrap() ); file_count += 1; } - assert_eq!(file_count, 2); + assert_eq!(file_count, 1); let url_path = tmp_dir.path().join(""); let url = Url::from_file_path(url_path).unwrap(); diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index ddf61bd3c2..f2212cb816 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -66,6 +66,10 @@ impl JsonHandler for SyncJsonHandler { ))); }; + if !parent.exists() { + std::fs::create_dir_all(parent)?; + } + // write data to tmp file let mut tmp_file = NamedTempFile::new_in(parent)?; let buf = to_json_bytes(data)?; diff --git a/kernel/src/engine/sync/mod.rs b/kernel/src/engine/sync/mod.rs index ae80c23bdf..e4e00982cb 100644 --- a/kernel/src/engine/sync/mod.rs +++ b/kernel/src/engine/sync/mod.rs @@ -97,3 +97,17 @@ where .map(|data| Ok(Box::new(ArrowEngineData::new(data??.into())) as _)); Ok(Box::new(result)) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::engine::tests::test_arrow_engine; + + #[test] + fn test_sync_engine() { + let tmp = tempfile::tempdir().unwrap(); + let url = url::Url::from_directory_path(tmp.path()).unwrap(); + let engine = SyncEngine::new(); + test_arrow_engine(&engine, &url); + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 65a0a6ab54..2e46986582 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -43,7 +43,7 @@ //! //! Delta Kernel needs to perform some basic operations against file systems like listing and //! reading files. These interactions are encapsulated in the [`FileSystemClient`] trait. -//! Implementors must take care that all assumptions on the behavior if the functions - like sorted +//! Implementers must take care that all assumptions on the behavior if the functions - like sorted //! results - are respected. //! //! ## Reading log and data files @@ -348,8 +348,11 @@ pub trait ExpressionHandler: AsAny { /// file system where the Delta table is present. Connector implementation of /// this trait can hide filesystem specific details from Delta Kernel. pub trait FileSystemClient: AsAny { - /// List the paths in the same directory that are lexicographically greater or equal to + /// List the paths in the same directory that are lexicographically greater than /// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name. + /// + /// If the path is directory-like (ends with '/'), the result should contain + /// all the files in the directory. fn list_from(&self, path: &Url) -> DeltaResult>>>; diff --git a/kernel/src/log_segment/tests.rs b/kernel/src/log_segment/tests.rs index a28347db2c..d00ad235ca 100644 --- a/kernel/src/log_segment/tests.rs +++ b/kernel/src/log_segment/tests.rs @@ -114,7 +114,6 @@ fn build_log_with_paths_and_checkpoint( let client = ObjectStoreFileSystemClient::new( store, false, // don't have ordered listing - Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); @@ -854,11 +853,7 @@ fn test_checkpoint_batch_with_no_sidecars_returns_none() -> DeltaResult<()> { #[test] fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); let read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; add_sidecar_to_store( @@ -898,11 +893,7 @@ fn test_checkpoint_batch_with_sidecars_returns_sidecar_batches() -> DeltaResult< #[test] fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); let checkpoint_batch = sidecar_batch_with_given_paths( vec!["sidecarfile1.parquet", "sidecarfile2.parquet"], @@ -929,11 +920,7 @@ fn test_checkpoint_batch_with_sidecar_files_that_do_not_exist() -> DeltaResult<( #[test] fn test_reading_sidecar_files_with_predicate() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); let read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME, SIDECAR_NAME])?; let checkpoint_batch = @@ -1019,11 +1006,7 @@ fn test_create_checkpoint_stream_errors_when_schema_has_add_but_no_sidecar_actio fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_no_file_actions( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); add_checkpoint_to_store( &store, // Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read. @@ -1062,11 +1045,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_multi_part( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); // Multi-part checkpoints should never contain sidecar actions. // This test intentionally includes batches with sidecar actions in multi-part checkpoints @@ -1126,11 +1105,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_mul fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); add_checkpoint_to_store( &store, @@ -1165,11 +1140,7 @@ fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars #[test] fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); write_json_to_store( &store, @@ -1219,11 +1190,7 @@ fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidecars() fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batches( ) -> DeltaResult<()> { let (store, log_root) = new_in_memory_store(); - let engine = DefaultEngine::new( - store.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); add_checkpoint_to_store( &store, diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9a21a6b9a7..816511e7a8 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -250,11 +250,9 @@ mod tests { let url = url::Url::from_directory_path(path).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, false, // don't have ordered listing - prefix, Arc::new(TokioBackgroundExecutor::new()), ); let cp = read_last_checkpoint(&client, &url).unwrap(); @@ -292,7 +290,6 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, false, // don't have ordered listing - Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); let url = Url::parse("memory:///valid/").expect("valid url"); diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 5968dff941..db9edbc68e 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -59,7 +59,6 @@ async fn single_commit_two_add_files() -> Result<(), Box> let location = Url::parse("memory:///")?; let engine = Arc::new(DefaultEngine::new( storage.clone(), - Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), )); @@ -114,11 +113,7 @@ async fn two_commits() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new( - storage.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); let table = Table::new(location); let expected_data = vec![batch.clone(), batch]; @@ -172,11 +167,7 @@ async fn remove_action() -> Result<(), Box> { .await?; let location = Url::parse("memory:///").unwrap(); - let engine = DefaultEngine::new( - storage.clone(), - Path::from("/"), - Arc::new(TokioBackgroundExecutor::new()), - ); + let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new())); let table = Table::new(location); let expected_data = vec![batch]; @@ -250,7 +241,6 @@ async fn stats() -> Result<(), Box> { let location = Url::parse("memory:///").unwrap(); let engine = Arc::new(DefaultEngine::new( storage.clone(), - Path::from(""), Arc::new(TokioBackgroundExecutor::new()), )); @@ -1072,7 +1062,6 @@ async fn predicate_on_non_nullable_partition_column() -> Result<(), Box Result<(), Box