Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
pub mod stats;
pub mod storage;

/// Converts a Stream-producing future to a synchronous iterator.
Expand Down Expand Up @@ -216,7 +217,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
Some(write_context.stats_columns()),
)
.await
}
}
Expand Down
159 changes: 107 additions & 52 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::sync::Arc;
use delta_kernel_derive::internal_api;

use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field};
use crate::arrow::array::{Array, Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field, Schema};
use crate::parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
Expand All @@ -23,6 +23,7 @@ use object_store::{DynObjectStore, ObjectStore};
use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::stats::collect_stats;
use super::UrlExt;
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
use crate::engine::arrow_data::ArrowEngineData;
Expand All @@ -46,22 +47,16 @@ pub struct DefaultParquetHandler<E: TaskExecutor> {
}

/// Metadata of a data file (typically a parquet file).
///
/// Currently just includes the the number of records as statistics, but will expand to include
/// more statistics and other metadata in the future.
#[derive(Debug)]
pub struct DataFileMetadata {
file_meta: FileMeta,
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
num_records: usize,
/// Collected statistics for this file (includes numRecords, tightBounds, etc.).
stats: StructArray,
}

impl DataFileMetadata {
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
Self {
file_meta,
num_records,
}
pub fn new(file_meta: FileMeta, stats: StructArray) -> Self {
Self { file_meta, stats }
}

/// Convert DataFileMetadata into a record batch which matches the schema returned by
Expand All @@ -80,7 +75,8 @@ impl DataFileMetadata {
last_modified,
size,
},
num_records,
stats,
..
} = self;
// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
Expand All @@ -104,20 +100,35 @@ impl DataFileMetadata {
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
let stats = Arc::new(StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
None,
1,
)?);

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(
crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()?,
let stats_array = Arc::new(stats.clone());

// Build schema dynamically based on stats (stats schema varies based on collected statistics)
let key_value_struct = DataType::Struct(
vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", DataType::Utf8, true),
]
.into(),
);
let schema = Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new(
"partitionValues",
DataType::Map(
Arc::new(Field::new("key_value", key_value_struct, false)),
false,
),
false,
),
vec![path, partitions, size, modification_time, stats],
Field::new("size", DataType::Int64, false),
Field::new("modificationTime", DataType::Int64, false),
Field::new("stats", stats_array.data_type().clone(), true),
]);

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(schema),
vec![path, partitions, size, modification_time, stats_array],
)?)))
}
}
Expand Down Expand Up @@ -148,10 +159,13 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
&self,
path: &url::Url,
data: Box<dyn EngineData>,
stats_columns: &[String],
) -> DeltaResult<DataFileMetadata> {
let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?;
let record_batch = batch.record_batch();
let num_records = record_batch.num_rows();

// Collect statistics before writing (includes numRecords)
let stats = collect_stats(record_batch, stats_columns)?;

let mut buffer = vec![];
let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?;
Expand Down Expand Up @@ -185,7 +199,7 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
}

let file_meta = FileMeta::new(path, modification_time, size);
Ok(DataFileMetadata::new(file_meta, num_records))
Ok(DataFileMetadata::new(file_meta, stats))
}

/// Write `data` to `{path}/<uuid>.parquet` as parquet using ArrowWriter and return the parquet
Expand All @@ -201,8 +215,11 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
path: &url::Url,
data: Box<dyn EngineData>,
partition_values: HashMap<String, String>,
stats_columns: Option<&[String]>,
) -> DeltaResult<Box<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
let parquet_metadata = self
.write_parquet(path, data, stats_columns.unwrap_or(&[]))
.await?;
parquet_metadata.as_record_batch(&partition_values)
}
}
Expand Down Expand Up @@ -294,6 +311,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
/// - `location` - The full URL path where the Parquet file should be written
/// (e.g., `s3://bucket/path/file.parquet`, `file:///path/to/file.parquet`).
/// - `data` - An iterator of engine data to be written to the Parquet file.
/// - `stats_columns` - Optional column names for which statistics should be collected.
///
/// # Returns
///
Expand All @@ -302,6 +320,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
location: url::Url,
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
_stats_columns: Option<&[String]>,
) -> DeltaResult<()> {
let store = self.store.clone();

Expand Down Expand Up @@ -606,19 +625,26 @@ mod tests {
let last_modified = 10000000000;
let num_records = 10;
let file_metadata = FileMeta::new(location.clone(), last_modified, size);
let data_file_metadata = DataFileMetadata::new(file_metadata, num_records);
let stats = StructArray::try_new(
vec![
Field::new("numRecords", ArrowDataType::Int64, true),
Field::new("tightBounds", ArrowDataType::Boolean, true),
]
.into(),
vec![
Arc::new(Int64Array::from(vec![num_records as i64])),
Arc::new(BooleanArray::from(vec![true])),
],
None,
)
.unwrap();
let data_file_metadata = DataFileMetadata::new(file_metadata, stats.clone());
let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]);
let actual = data_file_metadata
.as_record_batch(&partition_values)
.unwrap();
let actual = ArrowEngineData::try_from_engine_data(actual).unwrap();

let schema = Arc::new(
crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()
.unwrap(),
);
let mut partition_values_builder = MapBuilder::new(
Some(MapFieldNames {
entry: "key_value".to_string(),
Expand All @@ -632,13 +658,33 @@ mod tests {
partition_values_builder.values().append_value("a");
partition_values_builder.append(true).unwrap();
let partition_values = partition_values_builder.finish();
let stats_struct = StructArray::try_new_with_length(
vec![Field::new("numRecords", ArrowDataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![num_records as i64]))],
None,
1,
)
.unwrap();

// Build expected schema dynamically based on stats
let stats_field = Field::new("stats", stats.data_type().clone(), true);
let schema = Arc::new(crate::arrow::datatypes::Schema::new(vec![
Field::new("path", ArrowDataType::Utf8, false),
Field::new(
"partitionValues",
ArrowDataType::Map(
Arc::new(Field::new(
"key_value",
ArrowDataType::Struct(
vec![
Field::new("key", ArrowDataType::Utf8, false),
Field::new("value", ArrowDataType::Utf8, true),
]
.into(),
),
false,
)),
false,
),
false,
),
Field::new("size", ArrowDataType::Int64, false),
Field::new("modificationTime", ArrowDataType::Int64, false),
stats_field,
]));

let expected = RecordBatch::try_new(
schema,
Expand All @@ -647,7 +693,7 @@ mod tests {
Arc::new(partition_values),
Arc::new(Int64Array::from(vec![size as i64])),
Arc::new(Int64Array::from(vec![last_modified])),
Arc::new(stats_struct),
Arc::new(stats),
],
)
.unwrap();
Expand All @@ -670,7 +716,7 @@ mod tests {
));

let write_metadata = parquet_handler
.write_parquet(&Url::parse("memory:///data/").unwrap(), data)
.write_parquet(&Url::parse("memory:///data/").unwrap(), data, &[])
.await
.unwrap();

Expand All @@ -681,7 +727,7 @@ mod tests {
last_modified,
size,
},
num_records,
ref stats,
} = write_metadata;
let expected_location = Url::parse("memory:///data/").unwrap();

Expand All @@ -699,6 +745,15 @@ mod tests {
assert_eq!(&expected_location.join(filename).unwrap(), location);
assert_eq!(expected_size, size);
assert!(now - last_modified < 10_000);

// Check numRecords from stats
let num_records = stats
.column_by_name("numRecords")
.unwrap()
.as_any()
.downcast_ref::<Int64Array>()
.unwrap()
.value(0);
assert_eq!(num_records, 3);

// check we can read back
Expand Down Expand Up @@ -741,7 +796,7 @@ mod tests {

assert_result_error_with_message(
parquet_handler
.write_parquet(&Url::parse("memory:///data").unwrap(), data)
.write_parquet(&Url::parse("memory:///data").unwrap(), data, &[])
.await,
"Generic delta kernel error: Path must end with a trailing slash: memory:///data",
);
Expand Down Expand Up @@ -776,7 +831,7 @@ mod tests {
// Test writing through the trait method
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.write_parquet_file(file_url.clone(), data_iter, None)
.unwrap();

// Verify we can read the file back
Expand Down Expand Up @@ -964,7 +1019,7 @@ mod tests {
// Write the data
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.write_parquet_file(file_url.clone(), data_iter, None)
.unwrap();

// Read it back
Expand Down Expand Up @@ -1152,7 +1207,7 @@ mod tests {

// Write the first file
parquet_handler
.write_parquet_file(file_url.clone(), data_iter1)
.write_parquet_file(file_url.clone(), data_iter1, None)
.unwrap();

// Create second data set with different data
Expand All @@ -1168,7 +1223,7 @@ mod tests {

// Overwrite with second file (overwrite=true)
parquet_handler
.write_parquet_file(file_url.clone(), data_iter2)
.write_parquet_file(file_url.clone(), data_iter2, None)
.unwrap();

// Read back and verify it contains the second data set
Expand Down Expand Up @@ -1231,7 +1286,7 @@ mod tests {

// Write the first file
parquet_handler
.write_parquet_file(file_url.clone(), data_iter1)
.write_parquet_file(file_url.clone(), data_iter1, None)
.unwrap();

// Create second data set
Expand All @@ -1247,7 +1302,7 @@ mod tests {

// Write again - should overwrite successfully (new behavior always overwrites)
parquet_handler
.write_parquet_file(file_url.clone(), data_iter2)
.write_parquet_file(file_url.clone(), data_iter2, None)
.unwrap();

// Verify the file was overwritten with the new data
Expand Down
Loading
Loading