Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ rust-version = "1.81"
version = "0.9.0"

[workspace.dependencies]
object_store = { version = ">=0.11, <0.12" }
object_store = { version = "0.12" }
hdfs-native-object-store = "0.13.0"
hdfs-native = "0.11.0"
walkdir = "2.5.0"
2 changes: 1 addition & 1 deletion acceptance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ release = false
[dependencies]
delta_kernel = { path = "../kernel", features = [
"default-engine",
"arrow_53",
"arrow_55",
"developer-visibility",
] }
futures = "0.3"
Expand Down
3 changes: 2 additions & 1 deletion acceptance/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ pub async fn read_golden(path: &Path, _version: Option<&str>) -> DeltaResult<Rec
for meta in files.into_iter() {
if let Some(ext) = meta.location.extension() {
if ext == "parquet" {
let reader = ParquetObjectReader::new(store.clone(), meta);
let reader = ParquetObjectReader::new(store.clone(), meta.location)
.with_file_size(meta.size);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
if schema.is_none() {
schema = Some(builder.schema().clone());
Expand Down
2 changes: 1 addition & 1 deletion feature-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version.workspace = true
release = false

[dependencies]
delta_kernel = { path = "../kernel", features = ["arrow_53"] }
delta_kernel = { path = "../kernel", features = ["arrow_55"] }

[features]
default-engine = [ "delta_kernel/default-engine" ]
Expand Down
20 changes: 7 additions & 13 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pre-release-hook = [

[dependencies]
bytes = "1.7"
chrono = "=0.4.39"
chrono = "0.4.40"
fix-hidden-lifetime-bug = "0.2"
indexmap = "2.5.0"
itertools = "0.13"
Expand All @@ -62,17 +62,12 @@ visibility = "0.1.1"
tempfile = { version = "3", optional = true }

# Arrow supported versions
## 53
# Used in default engine
arrow_53 = { package = "arrow", version = "53", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true }
# Used in default and sync engine
parquet_53 = { package = "parquet", version = "53", features = ["async", "object_store"] , optional = true }
######
## 54
arrow_54 = { package = "arrow", version = "54", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true }
parquet_54 = { package = "parquet", version = "54", features = ["async", "object_store"] , optional = true }
## 54 (default)
arrow_55 = { package = "arrow", version = "55", features = ["chrono-tz", "ffi", "json", "prettyprint"], optional = true }
parquet_55 = { package = "parquet", version = "55", features = ["async", "object_store"] , optional = true }
######


futures = { version = "0.3", optional = true }
object_store = { workspace = true, optional = true }
hdfs-native-object-store = { workspace = true, optional = true }
Expand All @@ -90,11 +85,10 @@ walkdir = { workspace = true, optional = true }

[features]
# The default version to be expected
arrow = ["arrow_53"]
arrow = ["arrow_55"]

arrow_53 = ["dep:arrow_53", "dep:parquet_53"]
arrow_55 = ["dep:arrow_55", "dep:parquet_55"]

arrow_54 = ["dep:arrow_54", "dep:parquet_54"]

need_arrow = []
arrow-conversion = ["need_arrow"]
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/inspect-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = "2021"
publish = false

[dependencies]
arrow = "53"
arrow = "55"
clap = { version = "4.5", features = ["derive"] }
delta_kernel = { path = "../../../kernel", features = [
"cloud",
"arrow_53",
"arrow_55",
"default-engine",
"developer-visibility",
] }
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-multi-threaded/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ edition = "2021"
publish = false

[dependencies]
arrow = { version = "53", features = ["prettyprint", "chrono-tz"] }
arrow = { version = "55", features = ["prettyprint", "chrono-tz"] }
clap = { version = "4.5", features = ["derive"] }
delta_kernel = { path = "../../../kernel", features = [
"cloud",
"arrow_53",
"arrow_55",
"default-engine",
"sync-engine",
"developer-visibility",
Expand Down
4 changes: 2 additions & 2 deletions kernel/examples/read-table-single-threaded/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2021"
publish = false

[dependencies]
arrow = { version = "53", features = ["prettyprint", "chrono-tz"] }
arrow = { version = "55", features = ["prettyprint", "chrono-tz"] }
clap = { version = "4.5", features = ["derive"] }
delta_kernel = { path = "../../../kernel", features = [
"arrow_53",
"arrow_55",
"cloud",
"default-engine",
"sync-engine",
Expand Down
12 changes: 2 additions & 10 deletions kernel/src/arrow.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
//! This module exists to help re-export the version of arrow used by default-engine and other
//! parts of kernel that need arrow

#[cfg(feature = "arrow_53")]
pub use arrow_53::*;

#[cfg(all(feature = "arrow_54", not(feature = "arrow_53")))]
pub use arrow_54::*;
pub use arrow_55::*;

// if nothing is enabled but we need arrow because of some other feature flag, default to lowest
// supported version
#[cfg(all(
feature = "need_arrow",
not(feature = "arrow_53"),
not(feature = "arrow_54")
))]
#[cfg(all(feature = "need_arrow", not(feature = "arrow_55")))]
compile_error!("Requested a feature that needs arrow without enabling arrow. Please enable the `arrow_53` or `arrow_54` feature");
3 changes: 2 additions & 1 deletion kernel/src/engine/default/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
.send(Ok(FileMeta {
location,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
size: meta.size as usize,
}))
.ok();
}
Expand Down Expand Up @@ -136,6 +136,7 @@ impl<E: TaskExecutor> StorageHandler for ObjectStoreStorageHandler<E> {
// have to annotate type here or rustc can't figure it out
Ok::<bytes::Bytes, Error>(reqwest::get(url).await?.bytes().await?)
} else if let Some(rng) = range {
let rng = rng.start as u64..rng.end as u64;
Ok(store.get_range(&path, rng).await?)
} else {
let result = store.get(&path).await?;
Expand Down
12 changes: 6 additions & 6 deletions kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,11 +423,11 @@ mod tests {
self.inner.get_opts(location, options).await
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
self.inner.get_range(location, range).await
}

async fn get_ranges(&self, location: &Path, ranges: &[Range<usize>]) -> Result<Vec<Bytes>> {
async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
self.inner.get_ranges(location, ranges).await
}

Expand All @@ -439,15 +439,15 @@ mod tests {
self.inner.delete(location).await
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list(prefix)
}

fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'_, Result<ObjectMeta>> {
) -> BoxStream<'static, Result<ObjectMeta>> {
self.inner.list_with_offset(prefix, offset)
}

Expand Down Expand Up @@ -532,7 +532,7 @@ mod tests {
let files = &[FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
size: meta.size as usize,
}];

let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
Expand Down Expand Up @@ -681,7 +681,7 @@ mod tests {
FileMeta {
location: url,
last_modified: meta.last_modified.timestamp_millis(),
size: meta.size,
size: meta.size as usize,
}
}
})
Expand Down
15 changes: 9 additions & 6 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {

let metadata = self.store.head(&Path::from(path.path())).await?;
let modification_time = metadata.last_modified.timestamp_millis();
if size != metadata.size {
if (size as u64) != metadata.size {
return Err(Error::generic(format!(
"Size mismatch after writing parquet file: expected {}, got {}",
size, metadata.size
Expand Down Expand Up @@ -257,7 +257,8 @@ impl FileOpener for ParquetOpener {
Ok(Box::pin(async move {
// TODO avoid IO by converting passed file meta to ObjectMeta
let meta = store.head(&path).await?;
let mut reader = ParquetObjectReader::new(store, meta);
let mut reader =
ParquetObjectReader::new(store, meta.location).with_file_size(meta.size);
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()).await?;
let parquet_schema = metadata.schema();
let (indices, requested_ordering) =
Expand Down Expand Up @@ -394,7 +395,8 @@ mod tests {
let location = Path::from(url.path());
let meta = store.head(&location).await.unwrap();

let reader = ParquetObjectReader::new(store.clone(), meta.clone());
let reader =
ParquetObjectReader::new(store.clone(), meta.location).with_file_size(meta.size);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
Expand All @@ -404,7 +406,7 @@ mod tests {
let files = &[FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp(),
size: meta.size,
size: meta.size as usize,
}];

let handler = DefaultParquetHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
Expand Down Expand Up @@ -512,13 +514,14 @@ mod tests {

let filename = location.path().split('/').next_back().unwrap();
assert_eq!(&expected_location.join(filename).unwrap(), location);
assert_eq!(expected_size, size);
assert_eq!(expected_size, size as u64);
assert!(now - last_modified < 10_000);

// check we can read back
let path = Path::from(location.path());
let meta = store.head(&path).await.unwrap();
let reader = ParquetObjectReader::new(store.clone(), meta.clone());
let reader =
ParquetObjectReader::new(store.clone(), meta.location).with_file_size(meta.size);
let physical_schema = ParquetRecordBatchStreamBuilder::new(reader)
.await
.unwrap()
Expand Down
11 changes: 9 additions & 2 deletions kernel/src/engine/default/storage.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/* Hdfs object store uses object_store 0.11, not compatible
#[cfg(feature = "cloud")]
use hdfs_native_object_store::HdfsObjectStore;
*/
use object_store::parse_url_opts as parse_url_opts_object_store;
use object_store::path::Path;
use object_store::{Error, ObjectStore};
Expand All @@ -20,19 +22,24 @@ where

#[cfg(feature = "cloud")]
pub fn parse_url_opts_hdfs_native<I, K, V>(
url: &Url,
options: I,
_url: &Url,
_options: I,
) -> Result<(Box<dyn ObjectStore>, Path), Error>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
/* HDFS object store uses older object store, so can't use it directly here
Needs to be ported

let options_map = options
.into_iter()
.map(|(k, v)| (k.as_ref().to_string(), v.into()))
.collect();
let store = HdfsObjectStore::with_config(url.as_str(), options_map)?;
let path = Path::parse(url.path())?;
Ok((Box::new(store), path))
*/
todo!("Need to update hdfs object store");
}
13 changes: 3 additions & 10 deletions kernel/src/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
//! This module exists to help re-export the version of arrow used by default-engine and other
//! parts of kernel that need arrow

#[cfg(feature = "arrow_53")]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure that simply deleting support for the older arrow releases is not the right approach for this crate but it was the most expedient for my purposes

pub use parquet_53::*;

#[cfg(all(feature = "arrow_54", not(feature = "arrow_53")))]
pub use parquet_54::*;
#[cfg(feature = "arrow_55")]
pub use parquet_55::*;

// if nothing is enabled but we need arrow because of some other feature flag, default to lowest
// supported version
#[cfg(all(
feature = "need_arrow",
not(feature = "arrow_53"),
not(feature = "arrow_54")
))]
#[cfg(all(feature = "need_arrow", not(feature = "arrow_55"),))]
compile_error!("Requested a feature that needs arrow without enabling arrow. Please enable the `arrow_53` or `arrow_54` feature");
3 changes: 2 additions & 1 deletion kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ async fn read_expected(path: &Path) -> DeltaResult<RecordBatch> {
for meta in files.into_iter() {
if let Some(ext) = meta.location.extension() {
if ext == "parquet" {
let reader = ParquetObjectReader::new(store.clone(), meta);
let reader = ParquetObjectReader::new(store.clone(), meta.location)
.with_file_size(meta.size);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
if schema.is_none() {
schema = Some(builder.schema().clone());
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ async fn get_and_check_all_parquet_sizes(store: Arc<dyn ObjectStore>, path: &str
assert!(parquet_files
.iter()
.all(|f| f.as_ref().unwrap().size == size));
size.try_into().unwrap()
size
}

#[tokio::test]
Expand Down
Loading