Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ async fn try_main() -> DeltaResult<()> {
.with_data_change(true);

// Write the data using the engine
let write_context = Arc::new(txn.get_write_context());
let write_context = txn.get_write_context(&engine)?;
let file_metadata = engine
.write_parquet(&sample_data, write_context.as_ref(), HashMap::new())
.write_parquet(&sample_data, &write_context, HashMap::new())
.await?;

// Add the file metadata to the transaction
Expand Down
8 changes: 7 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod file_stream;
pub mod filesystem;
pub mod json;
pub mod parquet;
pub(crate) mod stats;
pub mod storage;

/// Converts a Stream-producing future to a synchronous iterator.
Expand Down Expand Up @@ -216,7 +217,12 @@ impl<E: TaskExecutor> DefaultEngine<E> {
)?;
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(write_context.target_dir(), physical_data, partition_values)
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
write_context.stats_columns(),
)
.await
}
}
Expand Down
120 changes: 94 additions & 26 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use delta_kernel_derive::internal_api;

use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder};
use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray};
use crate::arrow::datatypes::{DataType, Field};
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use crate::parquet::arrow::arrow_reader::{
ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
Expand All @@ -23,9 +23,10 @@ use object_store::{DynObjectStore, ObjectStore};
use uuid::Uuid;

use super::file_stream::{FileOpenFuture, FileOpener, FileStream};
use super::stats::StatisticsCollector;
use super::UrlExt;
use crate::engine::arrow_conversion::{TryFromArrow as _, TryIntoArrow as _};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_data::{extract_record_batch, ArrowEngineData};
use crate::engine::arrow_utils::{
fixup_parquet_read, generate_mask, get_requested_indices, ordering_needs_row_indexes,
RowIndexBuilder,
Expand All @@ -47,23 +48,31 @@ pub struct DefaultParquetHandler<E: TaskExecutor> {

/// Metadata of a data file (typically a parquet file).
///
/// Currently just includes the the number of records as statistics, but will expand to include
/// more statistics and other metadata in the future.
/// Includes full file statistics collected during write.
#[derive(Debug)]
pub struct DataFileMetadata {
file_meta: FileMeta,
// NB: We use usize instead of u64 since arrow uses usize for record batch sizes
num_records: usize,
// Optional full statistics (numRecords, nullCount, minValues, maxValues, tightBounds)
stats: Option<StructArray>,
}

impl DataFileMetadata {
pub fn new(file_meta: FileMeta, num_records: usize) -> Self {
Self {
file_meta,
num_records,
stats: None,
}
}

/// Attach full statistics to this metadata.
pub fn with_stats(mut self, stats: StructArray) -> Self {
self.stats = Some(stats);
self
}

/// Convert DataFileMetadata into a record batch which matches the schema returned by
/// [`add_files_schema`].
///
Expand All @@ -81,7 +90,9 @@ impl DataFileMetadata {
size,
},
num_records,
stats,
} = self;

// create the record batch of the write metadata
let path = Arc::new(StringArray::from(vec![location.to_string()]));
let key_builder = StringBuilder::new();
Expand All @@ -104,20 +115,43 @@ impl DataFileMetadata {
.map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?;
let size = Arc::new(Int64Array::from(vec![size]));
let modification_time = Arc::new(Int64Array::from(vec![*last_modified]));
let stats = Arc::new(StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
None,
1,
)?);

// Use full stats if available, otherwise minimal stats with just numRecords
let stats_array: Arc<StructArray> = if let Some(full_stats) = stats {
Arc::new(full_stats.clone())
} else {
Arc::new(StructArray::try_new_with_length(
vec![Field::new("numRecords", DataType::Int64, true)].into(),
vec![Arc::new(Int64Array::from(vec![*num_records as i64]))],
None,
1,
)?)
};

// Build schema dynamically based on the stats array's actual schema
let base_schema: ArrowSchema = crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()?;

// Replace the stats field with the actual stats schema
let mut fields: Vec<Field> = base_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
// The last field is stats - replace it with the actual stats schema
if let Some(last) = fields.last_mut() {
*last = Field::new(
"stats",
DataType::Struct(stats_array.fields().clone()),
true,
);
}
let schema = Arc::new(ArrowSchema::new(fields));

Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new(
Arc::new(
crate::transaction::BASE_ADD_FILES_SCHEMA
.as_ref()
.try_into_arrow()?,
),
vec![path, partitions, size, modification_time, stats],
schema,
vec![path, partitions, size, modification_time, stats_array],
)?)))
}
}
Expand Down Expand Up @@ -196,13 +230,36 @@ impl<E: TaskExecutor> DefaultParquetHandler<E> {
/// use [`crate::transaction::Transaction::with_data_change`].
///
/// [add file metadata]: crate::transaction::Transaction::add_files_schema
/// Write data to a parquet file and return file metadata with statistics.
///
/// This writes the data to a parquet file at the given path, collects statistics for
/// the specified columns, and returns the file metadata as an EngineData batch which
/// matches the [add file metadata] schema.
///
/// Only collects statistics for columns in `stats_columns`.
///
/// [add file metadata]: crate::transaction::Transaction::add_files_schema
pub async fn write_parquet_file(
&self,
path: &url::Url,
data: Box<dyn EngineData>,
partition_values: HashMap<String, String>,
stats_columns: &[String],
) -> DeltaResult<Box<dyn EngineData>> {
let parquet_metadata = self.write_parquet(path, data).await?;
// Collect statistics from the data during write
let record_batch = extract_record_batch(data.as_ref())?;

// Initialize stats collector and update with this batch
let mut stats_collector = StatisticsCollector::new(record_batch.schema(), stats_columns);
stats_collector.update(record_batch, None)?; // No mask for new file writes
let stats = stats_collector.finalize()?;

// Write the parquet file
let mut parquet_metadata = self.write_parquet(path, data).await?;

// Attach the collected statistics
parquet_metadata = parquet_metadata.with_stats(stats);

parquet_metadata.as_record_batch(&partition_values)
}
}
Expand Down Expand Up @@ -302,8 +359,10 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
location: url::Url,
mut data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
stats_columns: &[String],
) -> DeltaResult<()> {
let store = self.store.clone();
let stats_columns = stats_columns.to_vec();

self.task_executor.block_on(async move {
let path = Path::from_url_path(location.path())?;
Expand All @@ -317,21 +376,29 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {

let object_writer = ParquetObjectWriter::new(store, path);
let schema = first_record_batch.schema();
let mut writer = AsyncArrowWriter::try_new(object_writer, schema, None)?;
let mut writer = AsyncArrowWriter::try_new(object_writer, schema.clone(), None)?;

// Write the first batch
// Initialize stats collector
let mut stats_collector = StatisticsCollector::new(schema, &stats_columns);

// Write and collect stats for the first batch (no mask for new file writes)
writer.write(&first_record_batch).await?;
stats_collector.update(&first_record_batch, None)?;

// Write remaining batches
// Write remaining batches and accumulate stats
for result in data {
let engine_data = result?;
let arrow_data = ArrowEngineData::try_from_engine_data(engine_data)?;
let batch: RecordBatch = (*arrow_data).into();
writer.write(&batch).await?;
stats_collector.update(&batch, None)?; // No mask for new file writes
}

writer.finish().await?;

// Finalize stats (collected but not returned for now)
let _stats = stats_collector.finalize()?;

Ok(())
})
}
Expand Down Expand Up @@ -682,6 +749,7 @@ mod tests {
size,
},
num_records,
stats: _,
} = write_metadata;
let expected_location = Url::parse("memory:///data/").unwrap();

Expand Down Expand Up @@ -776,7 +844,7 @@ mod tests {
// Test writing through the trait method
let file_url = Url::parse("memory:///test/data.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.write_parquet_file(file_url.clone(), data_iter, &[])
.unwrap();

// Verify we can read the file back
Expand Down Expand Up @@ -964,7 +1032,7 @@ mod tests {
// Write the data
let file_url = Url::parse("memory:///roundtrip/test.parquet").unwrap();
parquet_handler
.write_parquet_file(file_url.clone(), data_iter)
.write_parquet_file(file_url.clone(), data_iter, &[])
.unwrap();

// Read it back
Expand Down Expand Up @@ -1152,7 +1220,7 @@ mod tests {

// Write the first file
parquet_handler
.write_parquet_file(file_url.clone(), data_iter1)
.write_parquet_file(file_url.clone(), data_iter1, &[])
.unwrap();

// Create second data set with different data
Expand All @@ -1168,7 +1236,7 @@ mod tests {

// Overwrite with second file (overwrite=true)
parquet_handler
.write_parquet_file(file_url.clone(), data_iter2)
.write_parquet_file(file_url.clone(), data_iter2, &[])
.unwrap();

// Read back and verify it contains the second data set
Expand Down Expand Up @@ -1231,7 +1299,7 @@ mod tests {

// Write the first file
parquet_handler
.write_parquet_file(file_url.clone(), data_iter1)
.write_parquet_file(file_url.clone(), data_iter1, &[])
.unwrap();

// Create second data set
Expand All @@ -1247,7 +1315,7 @@ mod tests {

// Write again - should overwrite successfully (new behavior always overwrites)
parquet_handler
.write_parquet_file(file_url.clone(), data_iter2)
.write_parquet_file(file_url.clone(), data_iter2, &[])
.unwrap();

// Verify the file was overwritten with the new data
Expand Down
Loading
Loading