Skip to content

Upgrade to arrow/parquet 55, and object_store to 0.12.0 and pyo3 to 0.24.0 #15466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 84 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,19 @@ ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }
apache-avro = { version = "0.17", default-features = false }
arrow = { version = "54.3.1", features = [
arrow = { version = "55.0.0", features = [
"prettyprint",
"chrono-tz",
] }
arrow-buffer = { version = "54.3.0", default-features = false }
arrow-flight = { version = "54.3.1", features = [
arrow-buffer = { version = "55.0.0", default-features = false }
arrow-flight = { version = "55.0.0", features = [
"flight-sql-experimental",
] }
arrow-ipc = { version = "54.3.0", default-features = false, features = [
arrow-ipc = { version = "55.0.0", default-features = false, features = [
"lz4",
] }
arrow-ord = { version = "54.3.0", default-features = false }
arrow-schema = { version = "54.3.0", default-features = false }
arrow-ord = { version = "55.0.0", default-features = false }
arrow-schema = { version = "55.0.0", default-features = false }
async-trait = "0.1.88"
bigdecimal = "0.4.8"
bytes = "1.10"
Expand Down Expand Up @@ -147,9 +147,9 @@ hashbrown = { version = "0.14.5", features = ["raw"] }
indexmap = "2.8.0"
itertools = "0.14"
log = "^0.4"
object_store = { version = "0.11.0", default-features = false }
object_store = { version = "0.12.0", default-features = false }
parking_lot = "0.12"
parquet = { version = "54.3.1", default-features = false, features = [
parquet = { version = "55.0.0", default-features = false, features = [
"arrow",
"async",
"object_store",
Expand Down
9 changes: 6 additions & 3 deletions datafusion-examples/examples/advanced_parquet_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
.to_string();

let object_store = Arc::clone(&self.object_store);
let mut inner = ParquetObjectReader::new(object_store, file_meta.object_meta);
let mut inner =
ParquetObjectReader::new(object_store, file_meta.object_meta.location)
.with_file_size(file_meta.object_meta.size);

if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint)
Expand Down Expand Up @@ -599,15 +601,15 @@ struct ParquetReaderWithCache {
impl AsyncFileReader for ParquetReaderWithCache {
fn get_bytes(
&mut self,
range: Range<usize>,
range: Range<u64>,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Bytes>> {
println!("get_bytes: {} Reading range {:?}", self.filename, range);
self.inner.get_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Vec<Bytes>>> {
println!(
"get_byte_ranges: {} Reading ranges {:?}",
Expand All @@ -618,6 +620,7 @@ impl AsyncFileReader for ParquetReaderWithCache {

fn get_metadata(
&mut self,
_options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, datafusion::parquet::errors::Result<Arc<ParquetMetaData>>> {
println!("get_metadata: {} returning cached metadata", self.filename);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.23.5", optional = true }
pyo3 = { version = "0.24.0", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl FileFormat for ArrowFormat {
for object in objects {
let r = store.as_ref().get(&object.location).await?;
let schema = match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(mut file, _) => {
let reader = FileReader::try_new(&mut file, None)?;
reader.schema()
Expand Down Expand Up @@ -442,7 +443,7 @@ mod tests {
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down Expand Up @@ -485,7 +486,7 @@ mod tests {
let object_meta = ObjectMeta {
location,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down
17 changes: 9 additions & 8 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ mod tests {
#[derive(Debug)]
struct VariableStream {
bytes_to_repeat: Bytes,
max_iterations: usize,
max_iterations: u64,
iterations_detected: Arc<Mutex<usize>>,
}

Expand Down Expand Up @@ -103,14 +103,15 @@ mod tests {

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let bytes = self.bytes_to_repeat.clone();
let range = 0..bytes.len() * self.max_iterations;
let len = bytes.len() as u64;
let range = 0..len * self.max_iterations;
let arc = self.iterations_detected.clone();
let stream = futures::stream::repeat_with(move || {
let arc_inner = arc.clone();
*arc_inner.lock().unwrap() += 1;
Ok(bytes.clone())
})
.take(self.max_iterations)
.take(self.max_iterations as usize)
.boxed();

Ok(GetResult {
Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
async fn get_ranges(
&self,
_location: &Path,
_ranges: &[Range<usize>],
_ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
unimplemented!()
}
Expand All @@ -154,7 +155,7 @@ mod tests {
fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
unimplemented!()
}

Expand All @@ -179,7 +180,7 @@ mod tests {
}

impl VariableStream {
pub fn new(bytes_to_repeat: Bytes, max_iterations: usize) -> Self {
pub fn new(bytes_to_repeat: Bytes, max_iterations: u64) -> Self {
Self {
bytes_to_repeat,
max_iterations,
Expand Down Expand Up @@ -371,7 +372,7 @@ mod tests {
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down Expand Up @@ -429,7 +430,7 @@ mod tests {
let object_meta = ObjectMeta {
location: Path::parse("/")?,
last_modified: DateTime::default(),
size: usize::MAX,
size: u64::MAX,
e_tag: None,
version: None,
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ mod tests {
.write_parquet(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
Ok(())
}
}
6 changes: 3 additions & 3 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ mod tests {
fn list(
&self,
_prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
Box::pin(futures::stream::once(async {
Err(object_store::Error::NotImplemented)
}))
Expand Down Expand Up @@ -408,7 +408,7 @@ mod tests {
)));

// Use the file size as the hint so we can get the full metadata from the first fetch
let size_hint = meta[0].size;
let size_hint = meta[0].size as usize;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
Expand Down Expand Up @@ -443,7 +443,7 @@ mod tests {
)));

// Use the a size hint larger than the file size to make sure we don't panic
let size_hint = meta[0].size + 100;
let size_hint = (meta[0].size + 100) as usize;

fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
.await
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ mod tests {
let meta = ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
};
Expand Down
21 changes: 11 additions & 10 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ impl FileOpener for ArrowOpener {
None => {
let r = object_store.get(file_meta.location()).await?;
match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
let arrow_reader = arrow::ipc::reader::FileReader::try_new(
file, projection,
Expand Down Expand Up @@ -305,7 +306,7 @@ impl FileOpener for ArrowOpener {
)?;
// read footer according to footer_len
let get_option = GetOptions {
range: Some(GetRange::Suffix(10 + footer_len)),
range: Some(GetRange::Suffix(10 + (footer_len as u64))),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to usize/u64 are for better wasm support, see

..Default::default()
};
let get_result = object_store
Expand All @@ -332,9 +333,9 @@ impl FileOpener for ArrowOpener {
.iter()
.flatten()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand All @@ -354,19 +355,19 @@ impl FileOpener for ArrowOpener {
.iter()
.flatten()
.filter(|block| {
let block_offset = block.offset() as usize;
block_offset >= range.start as usize
&& block_offset < range.end as usize
let block_offset = block.offset() as u64;
block_offset >= range.start as u64
&& block_offset < range.end as u64
})
.copied()
.collect_vec();

let recordbatch_ranges = recordbatches
.iter()
.map(|block| {
let block_len = block.bodyLength() as usize
+ block.metaDataLength() as usize;
let block_offset = block.offset() as usize;
let block_len =
block.bodyLength() as u64 + block.metaDataLength() as u64;
let block_offset = block.offset() as u64;
block_offset..block_offset + block_len
})
.collect_vec();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ mod tests {
)
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ mod tests {
.write_json(out_dir_url, DataFrameWriteOptions::new(), None)
.await
.expect_err("should fail because input file does not match inferred schema");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value d for column 0 at line 4");
assert_eq!(e.strip_backtrace(), "Arrow error: Parser error: Error while parsing value 'd' as type 'Int64' for column 0 at line 4. Row data: '[d,4]'");
Ok(())
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1786,13 +1786,13 @@ mod tests {
path: &str,
store: Arc<dyn ObjectStore>,
batch: RecordBatch,
) -> usize {
) -> u64 {
let mut writer =
ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), None).unwrap();
writer.write(&batch).unwrap();
writer.flush().unwrap();
let bytes = writer.into_inner().unwrap().into_inner().freeze();
let total_size = bytes.len();
let total_size = bytes.len() as u64;
let path = Path::from(path);
let payload = object_store::PutPayload::from_bytes(bytes);
store
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/test/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub fn local_unpartitioned_file(path: impl AsRef<std::path::Path>) -> ObjectMeta
ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
}
Expand Down Expand Up @@ -166,7 +166,7 @@ impl ObjectStore for BlockingObjectStore {
fn list(
&self,
prefix: Option<&Path>,
) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.inner.list(prefix)
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl TestParquetFile {

println!("Generated test dataset with {num_rows} rows");

let size = std::fs::metadata(&path)?.len() as usize;
let size = std::fs::metadata(&path)?.len();

let mut canonical_path = path.canonicalize()?;

Expand Down
9 changes: 6 additions & 3 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use insta::assert_snapshot;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::ArrowWriter;
use parquet::errors::ParquetError;
Expand Down Expand Up @@ -186,7 +187,7 @@ async fn store_parquet_in_memory(
location: Path::parse(format!("file-{offset}.parquet"))
.expect("creating path"),
last_modified: chrono::DateTime::from(SystemTime::now()),
size: buf.len(),
size: buf.len() as u64,
e_tag: None,
version: None,
};
Expand Down Expand Up @@ -218,9 +219,10 @@ struct ParquetFileReader {
impl AsyncFileReader for ParquetFileReader {
fn get_bytes(
&mut self,
range: Range<usize>,
range: Range<u64>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned.add(range.end - range.start);
let bytes_scanned = range.end - range.start;
self.metrics.bytes_scanned.add(bytes_scanned as usize);

self.store
.get_range(&self.meta.location, range)
Expand All @@ -232,6 +234,7 @@ impl AsyncFileReader for ParquetFileReader {

fn get_metadata(
&mut self,
_options: Option<&ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata = fetch_parquet_metadata(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
let meta = ObjectMeta {
location,
last_modified: metadata.modified().map(chrono::DateTime::from).unwrap(),
size: metadata.len() as usize,
size: metadata.len(),
e_tag: None,
version: None,
};
Expand Down
Loading