Skip to content

Commit 5559d58

Browse files
committed
fix: get prefix from offset path
Signed-off-by: Robert Pack <[email protected]>
1 parent eedfd47 commit 5559d58

File tree

7 files changed

+23
-41
lines changed

7 files changed

+23
-41
lines changed

ffi/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ impl<T> Default for ReferenceSet<T> {
718718
#[cfg(test)]
719719
mod tests {
720720
use delta_kernel::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine};
721-
use object_store::{memory::InMemory, path::Path};
721+
use object_store::memory::InMemory;
722722
use test_utils::{actions_to_string, add_commit, TestAction};
723723

724724
use super::*;
@@ -792,11 +792,7 @@ mod tests {
792792
actions_to_string(vec![TestAction::Metadata]),
793793
)
794794
.await?;
795-
let engine = DefaultEngine::new(
796-
storage.clone(),
797-
Path::from("/"),
798-
Arc::new(TokioBackgroundExecutor::new()),
799-
);
795+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
800796
let engine = engine_to_handle(Arc::new(engine), allocate_err);
801797
let path = "memory:///";
802798

kernel/src/engine/default/filesystem.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient};
1414
pub struct ObjectStoreFileSystemClient<E: TaskExecutor> {
1515
inner: Arc<DynObjectStore>,
1616
has_ordered_listing: bool,
17-
table_root: Path,
1817
task_executor: Arc<E>,
1918
readahead: usize,
2019
}
@@ -23,13 +22,11 @@ impl<E: TaskExecutor> ObjectStoreFileSystemClient<E> {
2322
pub(crate) fn new(
2423
store: Arc<DynObjectStore>,
2524
has_ordered_listing: bool,
26-
table_root: Path,
2725
task_executor: Arc<E>,
2826
) -> Self {
2927
Self {
3028
inner: store,
3129
has_ordered_listing,
32-
table_root,
3330
task_executor,
3431
readahead: 10,
3532
}
@@ -49,8 +46,14 @@ impl<E: TaskExecutor> FileSystemClient for ObjectStoreFileSystemClient<E> {
4946
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>> {
5047
let url = path.clone();
5148
let offset = Path::from(path.path());
52-
// TODO properly handle table prefix
53-
let prefix = self.table_root.child("_delta_log");
49+
let parts = offset.parts().collect_vec();
50+
if parts.len() == 0 {
51+
return Err(Error::generic(format!(
52+
"Offset path must not be a root directory. Got: '{}'",
53+
url.as_str()
54+
)));
55+
}
56+
let prefix = Path::from_iter(parts[..parts.len() - 1].iter().cloned());
5457

5558
let store = self.inner.clone();
5659

@@ -192,11 +195,9 @@ mod tests {
192195
let mut url = Url::from_directory_path(tmp.path()).unwrap();
193196

194197
let store = Arc::new(LocalFileSystem::new());
195-
let prefix = Path::from(url.path());
196198
let client = ObjectStoreFileSystemClient::new(
197199
store,
198200
false, // don't have ordered listing
199-
prefix,
200201
Arc::new(TokioBackgroundExecutor::new()),
201202
);
202203

@@ -229,11 +230,10 @@ mod tests {
229230
store.put(&name, data.clone().into()).await.unwrap();
230231

231232
let table_root = Url::parse("memory:///").expect("valid url");
232-
let prefix = Path::from_url_path(table_root.path()).expect("Couldn't get path");
233-
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
233+
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
234234
let files: Vec<_> = engine
235235
.get_file_system_client()
236-
.list_from(&table_root)
236+
.list_from(&table_root.join("_delta_log/0").unwrap())
237237
.unwrap()
238238
.try_collect()
239239
.unwrap();
@@ -260,11 +260,12 @@ mod tests {
260260

261261
let url = Url::from_directory_path(tmp.path()).unwrap();
262262
let store = Arc::new(LocalFileSystem::new());
263-
let prefix = Path::from_url_path(url.path()).expect("Couldn't get path");
264-
let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new()));
263+
let engine = DefaultEngine::new(store, Arc::new(TokioBackgroundExecutor::new()));
265264
let client = engine.get_file_system_client();
266265

267-
let files = client.list_from(&Url::parse("file://").unwrap()).unwrap();
266+
let files = client
267+
.list_from(&url.join("_delta_log/0").unwrap())
268+
.unwrap();
268269
let mut len = 0;
269270
for (file, expected) in files.zip(expected_names.iter()) {
270271
assert!(

kernel/src/engine/default/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::collections::HashMap;
1010
use std::sync::Arc;
1111

1212
use self::storage::parse_url_opts;
13-
use object_store::{path::Path, DynObjectStore};
13+
use object_store::DynObjectStore;
1414
use url::Url;
1515

1616
use self::executor::TaskExecutor;
@@ -60,8 +60,8 @@ impl<E: TaskExecutor> DefaultEngine<E> {
6060
V: Into<String>,
6161
{
6262
// table root is the path of the table in the ObjectStore
63-
let (store, table_root) = parse_url_opts(table_root, options)?;
64-
Ok(Self::new(Arc::new(store), table_root, task_executor))
63+
let (store, _table_root) = parse_url_opts(table_root, options)?;
64+
Ok(Self::new(Arc::new(store), task_executor))
6565
}
6666

6767
/// Create a new [`DefaultEngine`] instance
@@ -71,7 +71,7 @@ impl<E: TaskExecutor> DefaultEngine<E> {
7171
/// - `store`: The object store to use.
7272
/// - `table_root_path`: The root path of the table within storage.
7373
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
74-
pub fn new(store: Arc<DynObjectStore>, table_root: Path, task_executor: Arc<E>) -> Self {
74+
pub fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
7575
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
7676
// local filesystem doesn't return a sorted list by default. Although the `object_store`
7777
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
@@ -97,7 +97,6 @@ impl<E: TaskExecutor> DefaultEngine<E> {
9797
file_system: Arc::new(ObjectStoreFileSystemClient::new(
9898
store.clone(),
9999
!is_local,
100-
table_root,
101100
task_executor.clone(),
102101
)),
103102
json: Arc::new(DefaultJsonHandler::new(

kernel/src/log_segment/tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ fn build_log_with_paths_and_checkpoint(
9898
let client = ObjectStoreFileSystemClient::new(
9999
store,
100100
false, // don't have ordered listing
101-
Path::from("/"),
102101
Arc::new(TokioBackgroundExecutor::new()),
103102
);
104103

kernel/src/snapshot.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,11 +249,9 @@ mod tests {
249249
let url = url::Url::from_directory_path(path).unwrap();
250250

251251
let store = Arc::new(LocalFileSystem::new());
252-
let prefix = Path::from(url.path());
253252
let client = ObjectStoreFileSystemClient::new(
254253
store,
255254
false, // don't have ordered listing
256-
prefix,
257255
Arc::new(TokioBackgroundExecutor::new()),
258256
);
259257
let cp = read_last_checkpoint(&client, &url).unwrap();
@@ -291,7 +289,6 @@ mod tests {
291289
let client = ObjectStoreFileSystemClient::new(
292290
store,
293291
false, // don't have ordered listing
294-
Path::from("/"),
295292
Arc::new(TokioBackgroundExecutor::new()),
296293
);
297294
let url = Url::parse("memory:///valid/").expect("valid url");

kernel/tests/read.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>>
5858
let location = Url::parse("memory:///")?;
5959
let engine = Arc::new(DefaultEngine::new(
6060
storage.clone(),
61-
Path::from("/"),
6261
Arc::new(TokioBackgroundExecutor::new()),
6362
));
6463

@@ -113,11 +112,7 @@ async fn two_commits() -> Result<(), Box<dyn std::error::Error>> {
113112
.await?;
114113

115114
let location = Url::parse("memory:///").unwrap();
116-
let engine = DefaultEngine::new(
117-
storage.clone(),
118-
Path::from("/"),
119-
Arc::new(TokioBackgroundExecutor::new()),
120-
);
115+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
121116

122117
let table = Table::new(location);
123118
let expected_data = vec![batch.clone(), batch];
@@ -171,11 +166,7 @@ async fn remove_action() -> Result<(), Box<dyn std::error::Error>> {
171166
.await?;
172167

173168
let location = Url::parse("memory:///").unwrap();
174-
let engine = DefaultEngine::new(
175-
storage.clone(),
176-
Path::from("/"),
177-
Arc::new(TokioBackgroundExecutor::new()),
178-
);
169+
let engine = DefaultEngine::new(storage.clone(), Arc::new(TokioBackgroundExecutor::new()));
179170

180171
let table = Table::new(location);
181172
let expected_data = vec![batch];
@@ -249,7 +240,6 @@ async fn stats() -> Result<(), Box<dyn std::error::Error>> {
249240
let location = Url::parse("memory:///").unwrap();
250241
let engine = Arc::new(DefaultEngine::new(
251242
storage.clone(),
252-
Path::from(""),
253243
Arc::new(TokioBackgroundExecutor::new()),
254244
));
255245

kernel/tests/write.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ fn setup(
4646
let table_root_path = Path::from(format!("{base_path}{table_name}"));
4747
let url = Url::parse(&format!("{base_url}{table_root_path}/")).unwrap();
4848
let executor = Arc::new(TokioBackgroundExecutor::new());
49-
let engine = DefaultEngine::new(Arc::clone(&storage), table_root_path, executor);
49+
let engine = DefaultEngine::new(Arc::clone(&storage), executor);
5050

5151
(storage, engine, url)
5252
}

0 commit comments

Comments
 (0)