Skip to content

Commit 33f36f5

Browse files
authored
fix: sort list results based on URL scheme (#820)
## What changes are proposed in this pull request? In the default engine implementation we currently employ a hack to determine if list results need additional sorting by inspecting the type name of the `ObjectStore` instance. The base heuristic is that local files will not be sorted so we check for `LocalObjectStore`. However object stores might be wrapped in other stores to inject functionality or for testing purposes and different implementations for local files may suffer the same challenges. This PR proposes to generalise the heuristic by sorting results whenever we encounter a `file` scheme within the list method. ## API changes `ObjectStoreStorageHandler::new` no longer takes `has_ordered_listing` as a parameter. ## How was this change tested? Current unit tests.
1 parent b8ae33f commit 33f36f5

File tree

4 files changed

+29
-50
lines changed

4 files changed

+29
-50
lines changed

kernel/src/engine/default/filesystem.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,14 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, StorageHandler};
1414
#[derive(Debug)]
1515
pub struct ObjectStoreStorageHandler<E: TaskExecutor> {
1616
inner: Arc<DynObjectStore>,
17-
has_ordered_listing: bool,
1817
task_executor: Arc<E>,
1918
readahead: usize,
2019
}
2120

2221
impl<E: TaskExecutor> ObjectStoreStorageHandler<E> {
23-
pub(crate) fn new(
24-
store: Arc<DynObjectStore>,
25-
has_ordered_listing: bool,
26-
task_executor: Arc<E>,
27-
) -> Self {
22+
pub(crate) fn new(store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
2823
Self {
2924
inner: store,
30-
has_ordered_listing,
3125
task_executor,
3226
readahead: 10,
3327
}
@@ -64,6 +58,26 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
6458

6559
let store = self.inner.clone();
6660

61+
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
62+
// local filesystem doesn't return a sorted list by default. Although the `object_store`
63+
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
64+
// implementations actually do:
65+
// - AWS:
66+
// [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
67+
// states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical
68+
// order based on their key names." (Directory buckets are out of scope for now)
69+
// - Azure: Docs state
70+
// [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources):
71+
// "A listing operation returns an XML response that contains all or part of the requested
72+
// list. The operation returns entities in alphabetical order."
73+
// - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc
74+
// doesn't indicate order, but [this
75+
// page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page
76+
// shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored
77+
// in your Cloud Storage buckets, which are ordered in the list lexicographically by name."
78+
// So we just need to know if we're local and then if so, we sort the returned file list
79+
let has_ordered_listing = path.scheme() != "file";
80+
6781
// This channel will become the iterator
6882
let (sender, receiver) = std::sync::mpsc::sync_channel(4_000);
6983
let url = path.clone();
@@ -90,7 +104,7 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
90104
}
91105
});
92106

93-
if !self.has_ordered_listing {
107+
if !has_ordered_listing {
94108
// This FS doesn't return things in the order we require
95109
let mut fms: Vec<FileMeta> = receiver.into_iter().try_collect()?;
96110
fms.sort_unstable();
@@ -197,11 +211,8 @@ mod tests {
197211
let mut url = Url::from_directory_path(tmp.path()).unwrap();
198212

199213
let store = Arc::new(LocalFileSystem::new());
200-
let storage = ObjectStoreStorageHandler::new(
201-
store,
202-
false, // don't have ordered listing
203-
Arc::new(TokioBackgroundExecutor::new()),
204-
);
214+
let executor = Arc::new(TokioBackgroundExecutor::new());
215+
let storage = ObjectStoreStorageHandler::new(store, executor);
205216

206217
let mut slices: Vec<FileSlice> = Vec::new();
207218

kernel/src/engine/default/mod.rs

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,31 +70,9 @@ impl<E: TaskExecutor> DefaultEngine<E> {
7070
/// - `object_store`: The object store to use.
7171
/// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor].
7272
pub fn new(object_store: Arc<DynObjectStore>, task_executor: Arc<E>) -> Self {
73-
// HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because
74-
// local filesystem doesn't return a sorted list by default. Although the `object_store`
75-
// crate explicitly says it _does not_ return a sorted listing, in practice all the cloud
76-
// implementations actually do:
77-
// - AWS:
78-
// [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html)
79-
// states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical
80-
// order based on their key names." (Directory buckets are out of scope for now)
81-
// - Azure: Docs state
82-
// [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources):
83-
// "A listing operation returns an XML response that contains all or part of the requested
84-
// list. The operation returns entities in alphabetical order."
85-
// - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc
86-
// doesn't indicate order, but [this
87-
// page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page
88-
// shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored
89-
// in your Cloud Storage buckets, which are ordered in the list lexicographically by name."
90-
// So we just need to know if we're local and then if so, we sort the returned file list in
91-
// `filesystem.rs`
92-
let store_str = format!("{}", object_store);
93-
let is_local = store_str.starts_with("LocalFileSystem");
9473
Self {
9574
storage: Arc::new(ObjectStoreStorageHandler::new(
9675
object_store.clone(),
97-
!is_local,
9876
task_executor.clone(),
9977
)),
10078
json: Arc::new(DefaultJsonHandler::new(

kernel/src/log_segment/tests.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,7 @@ fn build_log_with_paths_and_checkpoint(
111111
}
112112
});
113113

114-
let storage = ObjectStoreStorageHandler::new(
115-
store,
116-
false, // don't have ordered listing
117-
Arc::new(TokioBackgroundExecutor::new()),
118-
);
114+
let storage = ObjectStoreStorageHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
119115

120116
let table_root = Url::parse("memory:///").expect("valid url");
121117
let log_root = table_root.join("_delta_log/").unwrap();

kernel/src/snapshot.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -625,11 +625,8 @@ mod tests {
625625
let url = url::Url::from_directory_path(path).unwrap();
626626

627627
let store = Arc::new(LocalFileSystem::new());
628-
let storage = ObjectStoreStorageHandler::new(
629-
store,
630-
false, // don't have ordered listing
631-
Arc::new(TokioBackgroundExecutor::new()),
632-
);
628+
let executor = Arc::new(TokioBackgroundExecutor::new());
629+
let storage = ObjectStoreStorageHandler::new(store, executor);
633630
let cp = read_last_checkpoint(&storage, &url).unwrap();
634631
assert!(cp.is_none())
635632
}
@@ -662,11 +659,8 @@ mod tests {
662659
.expect("put _last_checkpoint");
663660
});
664661

665-
let storage = ObjectStoreStorageHandler::new(
666-
store,
667-
false, // don't have ordered listing
668-
Arc::new(TokioBackgroundExecutor::new()),
669-
);
662+
let executor = Arc::new(TokioBackgroundExecutor::new());
663+
let storage = ObjectStoreStorageHandler::new(store, executor);
670664
let url = Url::parse("memory:///valid/").expect("valid url");
671665
let valid = read_last_checkpoint(&storage, &url).expect("read last checkpoint");
672666
let url = Url::parse("memory:///invalid/").expect("valid url");

0 commit comments

Comments
 (0)