Skip to content

Commit ec47584

Browse files
roeapscovichzachschuermann
authored
fix!: get prefix from offset path (#699)
## What changes are proposed in this pull request? Our `list_from` implementation for the object_store based filesystem client is currently broken, since it does not behave as documented / required for that function. Specifically we should list all files in the parent folder for using the path as offset to list from. In a follow up PR we then need to lift the assumption that all URLs will always be under the same store to get proper URL handling. ### This PR affects the following public APIs - `DefaultEngine::new` no longer requires a `table_root` parameter. - `list_from` consistently returns keys greater than (`>`) the offset, previously the `sync-engines` client returned all keys (`>=`) ## How was this change tested? Additional tests asserting consistent `list_from` behavior for all our file system client implementations. --------- Signed-off-by: Robert Pack <[email protected]> Co-authored-by: Ryan Johnson <[email protected]> Co-authored-by: Zach Schuermann <[email protected]>
1 parent 3065a76 commit ec47584

File tree

13 files changed

+163
-98
lines changed

13 files changed

+163
-98
lines changed

ffi/src/lib.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ impl<T> Default for ReferenceSet<T> {
765765
#[cfg(test)]
766766
mod tests {
767767
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
768-
use object_store::{memory::InMemory, path::Path};
768+
use object_store::memory::InMemory;
769769
use test_utils::{actions_to_string, actions_to_string_partitioned, add_commit, TestAction};
770770

771771
use super::*;
@@ -839,11 +839,7 @@ mod tests {
839839
actions_to_string(vec![TestAction::Metadata]),
840840
)
841841
.await?;
842-
let engine = DefaultEngine::new(
843-
storage.clone(),
844-
Path::from("/"),
845-
Arc::new(TokioBackgroundExecutor::new()),
846-
);
842+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
847843
let engine = engine_to_handle(Arc::new(engine), allocate_err);
848844
let path = "memory:///";
849845

@@ -872,11 +868,7 @@ mod tests {
872868
actions_to_string_partitioned(vec![TestAction::Metadata]),
873869
)
874870
.await?;
875-
let engine = DefaultEngine::new(
876-
storage.clone(),
877-
Path::from("/"),
878-
Arc::new(TokioBackgroundExecutor::new()),
879-
);
871+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
880872
let engine = engine_to_handle(Arc::new(engine), allocate_err);
881873
let path = "memory:///";
882874

kernel/src/engine/default/filesystem.rs

+23-16
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
1414
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
1515
inner: Arc<DynObjectStore>,
1616
has_ordered_listing: bool,
17-
table_root: Path,
1817
task_executor: Arc<E>,
1918
readahead: usize,
2019
}
@@ -23,13 +22,11 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
2322
pub(crate) fn new(
2423
store: Arc<DynObjectStore>,
2524
has_ordered_listing: bool,
26-
table_root: Path,
2725
task_executor: Arc<E>,
2826
) -> Self {
2927
Self {
3028
inner: store,
3129
has_ordered_listing,
32-
table_root,
3330
task_executor,
3431
readahead: 10,
3532
}
@@ -47,16 +44,28 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
4744
&self,
4845
path: &Url,
4946
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
50-
let url = path.clone();
51-
let offset = Path::from(path.path());
52-
// TODO properly handle table prefix
53-
let prefix = self.table_root.child("_delta_log");
47+
// The offset is used for list-after; the prefix is used to restrict the listing to a specific directory.
48+
// Unfortunately, `Path` provides no easy way to check whether a name is directory-like,
49+
// because it strips trailing /, so we're reduced to manually checking the original URL.
50+
let offset = Path::from_url_path(path.path())?;
51+
let prefix = if path.path().ends_with('/') {
52+
offset.clone()
53+
} else {
54+
let mut parts = offset.parts().collect_vec();
55+
if parts.pop().is_none() {
56+
return Err(Error::Generic(format!(
57+
"Offset path must not be a root directory. Got: '{}'",
58+
path.as_str()
59+
)));
60+
}
61+
Path::from_iter(parts)
62+
};
5463

5564
let store = self.inner.clone();
5665

5766
// This channel will become the iterator
5867
let (sender, receiver) = std::sync::mpsc::sync_channel(4_000);
59-
68+
let url = path.clone();
6069
self.task_executor.spawn(async move {
6170
let mut stream = store.list_with_offset(Some(&prefix), &offset);
6271

@@ -192,11 +201,9 @@ mod tests {
192201
let mut url = Url::from_directory_path(tmp.path()).unwrap();
193202

194203
let store = Arc::new(LocalFileSystem::new());
195-
let prefix = Path::from(url.path());
196204
let client = ObjectStoreFileSystemClient::new(
197205
store,
198206
false, // don't have ordered listing
199-
prefix,
200207
Arc::new(TokioBackgroundExecutor::new()),
201208
);
202209

@@ -229,11 +236,10 @@ mod tests {
229236
store.put(&name, data.clone().into()).await.unwrap();
230237

231238
let table_root = Url::parse("memory:///").expect("valid url");
232-
let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path");
233-
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
239+
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
234240
let files: Vec<_> = engine
235241
.get_file_system_client()
236-
.list_from(&table_root)
242+
.list_from(&table_root.join("_delta_log").unwrap().join("0").unwrap())
237243
.unwrap()
238244
.try_collect()
239245
.unwrap();
@@ -260,11 +266,12 @@ mod tests {
260266

261267
let url = Url::from_directory_path(tmp.path()).unwrap();
262268
let store = Arc::new(LocalFileSystem::new());
263-
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
264-
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
269+
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
265270
let client = engine.get_file_system_client();
266271

267-
let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
272+
let files = client
273+
.list_from(&url.join("_delta_log").unwrap().join("0").unwrap())
274+
.unwrap();
268275
let mut len = 0;
269276
for (file, expected) in files.zip(expected_names.iter()) {
270277
assert!(

kernel/src/engine/default/json.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
142142
let buffer = to_json_bytes(data)?;
143143
// Put if absent
144144
let store = self.store.clone(); // cheap Arc
145-
let path = Path::from(path.path());
145+
let path = Path::from_url_path(path.path())?;
146146
let path_str = path.to_string();
147147
self.task_executor
148148
.block_on(async move {

kernel/src/engine/default/mod.rs

+21-5
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::collections::HashMap;
1010
use std::sync::Arc;
1111

1212
use self::storage::parse_url_opts;
13-
use object_store::{path::Path, DynObjectStore};
13+
use object_store::DynObjectStore;
1414
use url::Url;
1515

1616
use self::executor::TaskExecutor;
@@ -60,8 +60,8 @@ impl<E: TaskExecutor> DefaultEngine<E> {
6060
V: Into<String>,
6161
{
6262
// table root is the path of the table in the ObjectStore
63-
let (store, table_root) = parse_url_opts(table_root, options)?;
64-
Ok(Self::new(Arc::new(store), table_root, task_executor))
63+
let (store, _table_root) = parse_url_opts(table_root, options)?;
64+
Ok(Self::new(Arc::new(store), task_executor))
6565
}
6666

6767
/// Create a new [`DefaultEngine`] instance
@@ -71,7 +71,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
7171
/// - `store`: The object store to use.
7272
/// - `table_root_path`: The root path of the table within storage.
7373
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
74-
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
74+
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
7575
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
7676
// local filesystem doesn't return a sorted list by default. Although the `object_store`
7777
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
@@ -97,7 +97,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
9797
file_system: Arc::new(ObjectStoreFileSystemClient::new(
9898
store.clone(),
9999
!is_local,
100-
table_root,
101100
task_executor.clone(),
102101
)),
103102
json: Arc::new(DefaultJsonHandler::new(
@@ -158,3 +157,20 @@ impl<E: TaskExecutor> Engine for DefaultEngine<E> {
158157
self.parquet.clone()
159158
}
160159
}
160+
161+
#[cfg(test)]
162+
mod tests {
163+
use super::executor::tokio::TokioBackgroundExecutor;
164+
use super::*;
165+
use crate::engine::tests::test_arrow_engine;
166+
use object_store::local::LocalFileSystem;
167+
168+
#[test]
169+
fn test_default_engine() {
170+
let tmp = tempfile::tempdir().unwrap();
171+
let url = Url::from_directory_path(tmp.path()).unwrap();
172+
let store = Arc::new(LocalFileSystem::new());
173+
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
174+
test_arrow_engine(&engine, &url);
175+
}
176+
}

kernel/src/engine/mod.rs

+77
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,80 @@ pub(crate) mod arrow_get_data;
2727
pub(crate) mod ensure_data_types;
2828
#[cfg(any(feature = "default-engine-base", feature = "sync-engine"))]
2929
pub mod parquet_row_group_skipping;
30+
31+
#[cfg(test)]
32+
mod tests {
33+
use itertools::Itertools;
34+
use object_store::path::Path;
35+
use std::sync::Arc;
36+
use url::Url;
37+
38+
use crate::arrow::array::{RecordBatch, StringArray};
39+
use crate::arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema};
40+
use crate::engine::arrow_data::ArrowEngineData;
41+
use crate::{Engine, EngineData};
42+
43+
use test_utils::delta_path_for_version;
44+
45+
fn test_list_from_should_sort_and_filter(
46+
engine: &dyn Engine,
47+
base_url: &Url,
48+
engine_data: impl Fn() -> Box<dyn EngineData>,
49+
) {
50+
let json = engine.get_json_handler();
51+
let get_data = || Box::new(std::iter::once(Ok(engine_data())));
52+
53+
let expected_names: Vec<Path> = (1..4)
54+
.map(|i| delta_path_for_version(i, "json"))
55+
.collect_vec();
56+
57+
for i in expected_names.iter().rev() {
58+
let path = base_url.join(i.as_ref()).unwrap();
59+
json.write_json_file(&path, get_data(), false).unwrap();
60+
}
61+
let path = base_url.join("other").unwrap();
62+
json.write_json_file(&path, get_data(), false).unwrap();
63+
64+
let fs = engine.get_file_system_client();
65+
66+
// list files after an offset
67+
let test_url = base_url.join(expected_names[0].as_ref()).unwrap();
68+
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
69+
assert_eq!(files.len(), expected_names.len() - 1);
70+
for (file, expected) in files.iter().zip(expected_names.iter().skip(1)) {
71+
assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap());
72+
}
73+
74+
let test_url = base_url
75+
.join(delta_path_for_version(0, "json").as_ref())
76+
.unwrap();
77+
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
78+
assert_eq!(files.len(), expected_names.len());
79+
80+
// list files inside a directory / key prefix
81+
let test_url = base_url.join("_delta_log/").unwrap();
82+
let files: Vec<_> = fs.list_from(&test_url).unwrap().try_collect().unwrap();
83+
assert_eq!(files.len(), expected_names.len());
84+
for (file, expected) in files.iter().zip(expected_names.iter()) {
85+
assert_eq!(file.location, base_url.join(expected.as_ref()).unwrap());
86+
}
87+
}
88+
89+
fn get_arrow_data() -> Box<dyn EngineData> {
90+
let schema = Arc::new(ArrowSchema::new(vec![Field::new(
91+
"dog",
92+
ArrowDataType::Utf8,
93+
true,
94+
)]));
95+
let data = RecordBatch::try_new(
96+
schema.clone(),
97+
vec![Arc::new(StringArray::from(vec!["remi", "wilson"]))],
98+
)
99+
.unwrap();
100+
Box::new(ArrowEngineData::new(data))
101+
}
102+
103+
pub(crate) fn test_arrow_engine(engine: &dyn Engine, base_url: &Url) {
104+
test_list_from_should_sort_and_filter(engine, base_url, get_arrow_data);
105+
}
106+
}

kernel/src/engine/sync/fs_client.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl FileSystemClient for SyncFilesystemClient {
3939
let all_ents: Vec<_> = std::fs::read_dir(path_to_read)?
4040
.filter(|ent_res| {
4141
match (ent_res, min_file_name) {
42-
(Ok(ent), Some(min_file_name)) => ent.file_name() >= *min_file_name,
42+
(Ok(ent), Some(min_file_name)) => ent.file_name() > *min_file_name,
4343
_ => true, // Keep unfiltered and/or error entries
4444
}
4545
})
@@ -106,7 +106,7 @@ mod tests {
106106
writeln!(f, "null")?;
107107
f.flush()?;
108108

109-
let url_path = tmp_dir.path().join(get_json_filename(1));
109+
let url_path = tmp_dir.path().join(get_json_filename(0));
110110
let url = Url::from_file_path(url_path).unwrap();
111111
let files: Vec<_> = client.list_from(&url)?.try_collect()?;
112112

@@ -137,11 +137,11 @@ mod tests {
137137
// i+1 in index because we started at 0001 in the listing
138138
assert_eq!(
139139
file?.location.to_file_path().unwrap().to_str().unwrap(),
140-
expected[i + 1].to_str().unwrap()
140+
expected[i + 2].to_str().unwrap()
141141
);
142142
file_count += 1;
143143
}
144-
assert_eq!(file_count, 2);
144+
assert_eq!(file_count, 1);
145145

146146
let url_path = tmp_dir.path().join("");
147147
let url = Url::from_file_path(url_path).unwrap();

kernel/src/engine/sync/json.rs

+4
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ impl JsonHandler for SyncJsonHandler {
6666
)));
6767
};
6868

69+
if !parent.exists() {
70+
std::fs::create_dir_all(parent)?;
71+
}
72+
6973
// write data to tmp file
7074
let mut tmp_file = NamedTempFile::new_in(parent)?;
7175
let buf = to_json_bytes(data)?;

kernel/src/engine/sync/mod.rs

+14
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,17 @@ where
9797
.map(|data| Ok(Box::new(ArrowEngineData::new(data??.into())) as _));
9898
Ok(Box::new(result))
9999
}
100+
101+
#[cfg(test)]
102+
mod tests {
103+
use super::*;
104+
use crate::engine::tests::test_arrow_engine;
105+
106+
#[test]
107+
fn test_sync_engine() {
108+
let tmp = tempfile::tempdir().unwrap();
109+
let url = url::Url::from_directory_path(tmp.path()).unwrap();
110+
let engine = SyncEngine::new();
111+
test_arrow_engine(&engine, &url);
112+
}
113+
}

kernel/src/lib.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
//!
4444
//! Delta Kernel needs to perform some basic operations against file systems like listing and
4545
//! reading files. These interactions are encapsulated in the [`FileSystemClient`] trait.
46-
//! Implementors must take care that all assumptions on the behavior if the functions - like sorted
46+
//! Implementers must take care that all assumptions on the behavior if the functions - like sorted
4747
//! results - are respected.
4848
//!
4949
//! ## Reading log and data files
@@ -348,8 +348,11 @@ pub trait ExpressionHandler: AsAny {
348348
/// file system where the Delta table is present. Connector implementation of
349349
/// this trait can hide filesystem specific details from Delta Kernel.
350350
pub trait FileSystemClient: AsAny {
351-
/// List the paths in the same directory that are lexicographically greater or equal to
351+
/// List the paths in the same directory that are lexicographically greater than
352352
/// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
353+
///
354+
/// If the path is directory-like (ends with '/'), the result should contain
355+
/// all the files in the directory.
353356
fn list_from(&self, path: &Url)
354357
-> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
355358

0 commit comments

Comments
 (0)