-
Notifications
You must be signed in to change notification settings - Fork 1k
Open
Description
I'm using the AsyncArrowWriter for writing record batches to a parquet file. Each record batch has an attached metadata in its schema.
Then I use the ParquetRecordBatchStreamBuilder to read the parquet file and set the skip_arrow_schema in ArrowReaderOptions to false. However, the schema in record batches are lost.
I have written a test for reproducing the bug:
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use arrow::util::data_gen::create_random_batch;
use arrow_schema::{DataType, Field, Schema};
use futures::TryStreamExt;
use object_store::{LocalFileSystem, ObjectStoreRef, Path};
use parquet::arrow::AsyncArrowWriter;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
use parquet::arrow::async_writer::ParquetObjectWriter;
use tempfile::tempdir;
#[tokio::test]
async fn test_schema_metadata_presence() {
let temp_dir = tempdir().unwrap();
let store: ObjectStoreRef =
Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());
let file_schema = Arc::new(
Schema::new(vec![
Field::new("row_id", DataType::UInt64, false),
Field::new("value", DataType::Float32, false),
])
.with_metadata(HashMap::from([(
"my_key".to_string(),
"my_value".to_string(),
)])),
);
let batch = create_random_batch(file_schema.clone(), 16, 0.0, 0.0).unwrap();
// Writes data to a parquet file.
let path = Path::parse("my.parquet").unwrap();
let object_writer = ParquetObjectWriter::new(store.clone(), path.clone());
let mut writer = AsyncArrowWriter::try_new(object_writer, file_schema, None).unwrap();
writer.write(&batch).await.unwrap();
writer.finish().await.unwrap();
// Reads data from the parquet file.
let mut parquet_reader = ParquetObjectReader::new(store, path);
let parquet_metadata = parquet_reader.get_metadata(None).await.unwrap();
let reader_metadata = ArrowReaderMetadata::try_new(
parquet_metadata,
ArrowReaderOptions::new().with_skip_arrow_metadata(false),
)
.unwrap();
let stream =
ParquetRecordBatchStreamBuilder::new_with_metadata(parquet_reader, reader_metadata)
.build()
.unwrap();
let batches = stream.try_collect::<Vec<_>>().await.unwrap();
for batch in batches {
assert_eq!(
batch.schema_ref().metadata().get("my_key").unwrap(),
"my_value"
);
}
}
}Arrow version: 56.2.0
Metadata
Metadata
Assignees
Labels
No labels