diff --git a/Cargo.toml b/Cargo.toml index bcd69af76b..a0b0008be4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ rust-version = "1.81" version = "0.9.0" [workspace.dependencies] -object_store = { version = ">=0.11, <0.12" } +object_store = { version = "0.12" } hdfs-native-object-store = "0.13.0" hdfs-native = "0.11.0" walkdir = "2.5.0" diff --git a/acceptance/Cargo.toml b/acceptance/Cargo.toml index e844007ef5..ab25061959 100644 --- a/acceptance/Cargo.toml +++ b/acceptance/Cargo.toml @@ -16,7 +16,7 @@ release = false [dependencies] delta_kernel = { path = "../kernel", features = [ "default-engine", - "arrow_53", + "arrow_55", "developer-visibility", ] } futures = "0.3" diff --git a/acceptance/src/data.rs b/acceptance/src/data.rs index 9685f29c37..757425cced 100644 --- a/acceptance/src/data.rs +++ b/acceptance/src/data.rs @@ -25,7 +25,8 @@ pub async fn read_golden(path: &Path, _version: Option<&str>) -> DeltaResult StorageHandler for ObjectStoreStorageHandler { .send(Ok(FileMeta { location, last_modified: meta.last_modified.timestamp_millis(), - size: meta.size, + size: meta.size as usize, })) .ok(); } @@ -136,6 +136,7 @@ impl StorageHandler for ObjectStoreStorageHandler { // have to annotate type here or rustc can't figure it out Ok::(reqwest::get(url).await?.bytes().await?) } else if let Some(rng) = range { + let rng = rng.start as u64..rng.end as u64; Ok(store.get_range(&path, rng).await?) } else { let result = store.get(&path).await?; diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 1dc35539e4..7ff6cd6809 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -423,11 +423,11 @@ mod tests { self.inner.get_opts(location, options).await } - async fn get_range(&self, location: &Path, range: Range) -> Result { + async fn get_range(&self, location: &Path, range: Range) -> Result { self.inner.get_range(location, range).await } - async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { + async fn get_ranges(&self, location: &Path, ranges: &[Range]) -> Result> { self.inner.get_ranges(location, ranges).await } @@ -439,7 +439,7 @@ mod tests { self.inner.delete(location).await } - fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result> { + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.inner.list(prefix) } @@ -447,7 +447,7 @@ mod tests { &self, prefix: Option<&Path>, offset: &Path, - ) -> BoxStream<'_, Result> { + ) -> BoxStream<'static, Result> { self.inner.list_with_offset(prefix, offset) } @@ -532,7 +532,7 @@ mod tests { let files = &[FileMeta { location: url.clone(), last_modified: meta.last_modified.timestamp_millis(), - size: meta.size, + size: meta.size as usize, }]; let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); @@ -681,7 +681,7 @@ mod tests { FileMeta { location: url, last_modified: meta.last_modified.timestamp_millis(), - size: meta.size, + size: meta.size as usize, } } }) diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 8636b3d9f8..a54d8dc0da 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -145,7 +145,7 @@ impl DefaultParquetHandler { let metadata = self.store.head(&Path::from(path.path())).await?; let modification_time = metadata.last_modified.timestamp_millis(); - if size != metadata.size { + if (size as u64) != metadata.size { return Err(Error::generic(format!( "Size mismatch after writing parquet file: expected {}, got {}", size, metadata.size @@ -257,7 +257,8 @@ impl FileOpener for ParquetOpener { Ok(Box::pin(async move { // TODO avoid IO by converting passed file meta to ObjectMeta let meta = store.head(&path).await?; - let mut reader = ParquetObjectReader::new(store, meta); + let mut reader = + ParquetObjectReader::new(store, meta.location).with_file_size(meta.size); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?; let parquet_schema = metadata.schema(); let (indices, requested_ordering) = @@ -394,7 +395,8 @@ mod tests { let location = Path::from(url.path()); let meta = store.head(&location).await.unwrap(); - let reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let reader = + ParquetObjectReader::new(store.clone(), meta.location).with_file_size(meta.size); let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) .await .unwrap() @@ -404,7 +406,7 @@ mod tests { let files = &[FileMeta { location: url.clone(), last_modified: meta.last_modified.timestamp(), - size: meta.size, + size: meta.size as usize, }]; let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); @@ -512,13 +514,14 @@ mod tests { let filename = location.path().split('/').next_back().unwrap(); assert_eq!(&expected_location.join(filename).unwrap(), location); - assert_eq!(expected_size, size); + assert_eq!(expected_size, size as u64); assert!(now - last_modified < 10_000); // check we can read back let path = Path::from(location.path()); let meta = store.head(&path).await.unwrap(); - let reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let reader = + ParquetObjectReader::new(store.clone(), meta.location).with_file_size(meta.size); let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) .await .unwrap() diff --git a/kernel/src/engine/default/storage.rs b/kernel/src/engine/default/storage.rs index 1ac83a1102..fba4c6105d 100644 --- a/kernel/src/engine/default/storage.rs +++ b/kernel/src/engine/default/storage.rs @@ -1,5 +1,7 @@ +/* Hdfs object store uses object_store 0.11, not compatible #[cfg(feature = "cloud")] use hdfs_native_object_store::HdfsObjectStore; + */ use object_store::parse_url_opts as parse_url_opts_object_store; use object_store::path::Path; use object_store::{Error, ObjectStore}; @@ -20,14 +22,17 @@ where #[cfg(feature = "cloud")] pub fn parse_url_opts_hdfs_native( - url: &Url, - options: I, + _url: &Url, + _options: I, ) -> Result<(Box, Path), Error> where I: IntoIterator, K: AsRef, V: Into, { + /* HDFS object store uses older object store, so can't use it directly here + Needs to be ported + let options_map = options .into_iter() .map(|(k, v)| (k.as_ref().to_string(), v.into())) @@ -35,4 +40,6 @@ where let store = HdfsObjectStore::with_config(url.as_str(), options_map)?; let path = Path::parse(url.path())?; Ok((Box::new(store), path)) + */ + todo!("Need to update hdfs object store"); } diff --git a/kernel/src/parquet.rs b/kernel/src/parquet.rs index 3620792901..bc9da1f769 100644 --- a/kernel/src/parquet.rs +++ b/kernel/src/parquet.rs @@ -1,17 +1,10 @@ //! This module exists to help re-export the version of arrow used by default-engine and other //! parts of kernel that need arrow -#[cfg(feature = "arrow_53")] -pub use parquet_53::*; - -#[cfg(all(feature = "arrow_54", not(feature = "arrow_53")))] -pub use parquet_54::*; +#[cfg(feature = "arrow_55")] +pub use parquet_55::*; // if nothing is enabled but we need arrow because of some other feature flag, default to lowest // supported version -#[cfg(all( - feature = "need_arrow", - not(feature = "arrow_53"), - not(feature = "arrow_54") -))] +#[cfg(all(feature = "need_arrow", not(feature = "arrow_55"),))] compile_error!("Requested a feature that needs arrow without enabling arrow. Please enable the `arrow_53` or `arrow_54` feature"); diff --git a/kernel/tests/golden_tables.rs b/kernel/tests/golden_tables.rs index 241279906f..3b1e54d0a1 100644 --- a/kernel/tests/golden_tables.rs +++ b/kernel/tests/golden_tables.rs @@ -35,7 +35,8 @@ async fn read_expected(path: &Path) -> DeltaResult { for meta in files.into_iter() { if let Some(ext) = meta.location.extension() { if ext == "parquet" { - let reader = ParquetObjectReader::new(store.clone(), meta); + let reader = ParquetObjectReader::new(store.clone(), meta.location) + .with_file_size(meta.size); let builder = ParquetRecordBatchStreamBuilder::new(reader).await?; if schema.is_none() { schema = Some(builder.schema().clone()); diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index eb3671595b..de7b2fdb46 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -371,7 +371,7 @@ async fn get_and_check_all_parquet_sizes(store: Arc, path: &str assert!(parquet_files .iter() .all(|f| f.as_ref().unwrap().size == size)); - size.try_into().unwrap() + size } #[tokio::test]