1616//! ## Architecture
1717//!
1818//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
19- //! - [`ActionReconciliationIterator `] - Iterator over the checkpoint data to be written
19+ //! - [`CheckpointDataIterator `] - Trait for iterators over checkpoint data to be written
2020//!
2121//! ## Usage
2222//!
3333//! # use std::sync::Arc;
3434//! # use delta_kernel::ActionReconciliationIterator;
3535//! # use delta_kernel::checkpoint::CheckpointWriter;
36+ //! # use delta_kernel::checkpoint::CheckpointDataIterator;
3637//! # use delta_kernel::Engine;
3738//! # use delta_kernel::Snapshot;
3839//! # use delta_kernel::SnapshotRef;
3940//! # use delta_kernel::DeltaResult;
4041//! # use delta_kernel::Error;
4142//! # use delta_kernel::FileMeta;
4243//! # use url::Url;
43- //! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator ) -> DeltaResult<FileMeta> {
44+ //! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator ) -> DeltaResult<FileMeta> {
4445//! todo!() /* engine-specific logic to write data to object storage*/
4546//! }
4647//!
@@ -114,6 +115,11 @@ use url::Url;
114115
115116mod stats_transform;
116117
118+ use stats_transform:: {
119+ build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform,
120+ StatsTransformConfig ,
121+ } ;
122+
117123#[ cfg( test) ]
118124mod tests;
119125
@@ -131,8 +137,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
131137 . into ( )
132138} ) ;
133139
134- /// Schema for extracting relevant actions from log files for checkpoint creation
135- static CHECKPOINT_ACTIONS_SCHEMA : LazyLock < SchemaRef > = LazyLock :: new ( || {
140+ /// Schema for V1 checkpoints (without checkpointMetadata action)
141+ static CHECKPOINT_ACTIONS_SCHEMA_V1 : LazyLock < SchemaRef > = LazyLock :: new ( || {
136142 Arc :: new ( StructType :: new_unchecked ( [
137143 StructField :: nullable ( ADD_NAME , Add :: to_schema ( ) ) ,
138144 StructField :: nullable ( REMOVE_NAME , Remove :: to_schema ( ) ) ,
@@ -143,16 +149,118 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
143149 ] ) )
144150} ) ;
145151
146- // Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints
147- // We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
148- // we're not supporting yet due to the lack of map support TODO(#880).
149- static CHECKPOINT_METADATA_ACTION_SCHEMA : LazyLock < SchemaRef > = LazyLock :: new ( || {
150- Arc :: new ( StructType :: new_unchecked ( [ StructField :: nullable (
152+ /// Schema for the checkpointMetadata field in V2 checkpoints.
153+ /// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
154+ /// we're not supporting yet due to the lack of map support TODO(#880).
155+ fn checkpoint_metadata_field ( ) -> StructField {
156+ StructField :: nullable (
151157 CHECKPOINT_METADATA_NAME ,
152158 DataType :: struct_type_unchecked ( [ StructField :: not_null ( "version" , DataType :: LONG ) ] ) ,
153- ) ] ) )
159+ )
160+ }
161+
162+ /// Schema for V2 checkpoints (includes checkpointMetadata action)
163+ static CHECKPOINT_ACTIONS_SCHEMA_V2 : LazyLock < SchemaRef > = LazyLock :: new ( || {
164+ Arc :: new ( StructType :: new_unchecked ( [
165+ StructField :: nullable ( ADD_NAME , Add :: to_schema ( ) ) ,
166+ StructField :: nullable ( REMOVE_NAME , Remove :: to_schema ( ) ) ,
167+ StructField :: nullable ( METADATA_NAME , Metadata :: to_schema ( ) ) ,
168+ StructField :: nullable ( PROTOCOL_NAME , Protocol :: to_schema ( ) ) ,
169+ StructField :: nullable ( SET_TRANSACTION_NAME , SetTransaction :: to_schema ( ) ) ,
170+ StructField :: nullable ( SIDECAR_NAME , Sidecar :: to_schema ( ) ) ,
171+ checkpoint_metadata_field ( ) ,
172+ ] ) )
154173} ) ;
155174
175+ /// Trait for iterators that yield checkpoint data batches.
176+ ///
177+ /// This trait abstracts over checkpoint data iterators, allowing the concrete implementation
178+ /// to change without breaking the public API. Implementations yield [`FilteredEngineData`]
179+ /// batches that should be written to the checkpoint file.
180+ ///
181+ /// # Yielded Data
182+ ///
183+ /// All batches conform to [`output_schema()`][Self::output_schema], which is determined by:
184+ /// - **V1 checkpoints**: Schema includes add, remove, metadata, protocol, txn, sidecar
185+ /// - **V2 checkpoints**: Same as V1 plus checkpointMetadata
186+ ///
187+ /// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table
188+ /// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`).
189+ ///
190+ /// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other
191+ /// action fields set to null.
192+ pub trait CheckpointDataIterator : Iterator < Item = DeltaResult < FilteredEngineData > > {
193+ /// Returns the schema for writing checkpoint data.
194+ ///
195+ /// All batches from this iterator conform to this schema. The schema reflects:
196+ /// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field)
197+ /// - Stats configuration (`stats` and/or `stats_parsed` fields)
198+ fn output_schema ( & self ) -> & SchemaRef ;
199+
200+ /// Returns the shared iterator state for tracking counts and exhaustion.
201+ ///
202+ /// This state should be passed to [`CheckpointWriter::finalize`] after the iterator
203+ /// has been fully consumed.
204+ fn state ( & self ) -> Arc < ActionReconciliationIteratorState > ;
205+ }
206+
207+ /// Iterator that applies stats transforms to checkpoint data batches.
208+ ///
209+ /// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an
210+ /// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch
211+ /// to populate stats fields.
212+ ///
213+ /// All batches (including the checkpoint metadata batch for V2 checkpoints) share the
214+ /// same schema and go through the same transform pipeline. The stats transform only
215+ /// operates on the `add` field, so other fields (including `checkpointMetadata`) pass
216+ /// through unchanged.
217+ pub struct TransformingCheckpointIterator {
218+ inner : ActionReconciliationIterator ,
219+ evaluator : Arc < dyn crate :: ExpressionEvaluator > ,
220+ /// Schema for writing checkpoint data (includes/excludes stats fields based on config)
221+ output_schema : SchemaRef ,
222+ }
223+
224+ impl TransformingCheckpointIterator {
225+ /// Creates a new transforming iterator.
226+ pub ( crate ) fn new (
227+ inner : ActionReconciliationIterator ,
228+ evaluator : Arc < dyn crate :: ExpressionEvaluator > ,
229+ output_schema : SchemaRef ,
230+ ) -> Self {
231+ Self {
232+ inner,
233+ evaluator,
234+ output_schema,
235+ }
236+ }
237+ }
238+
239+ impl CheckpointDataIterator for TransformingCheckpointIterator {
240+ fn output_schema ( & self ) -> & SchemaRef {
241+ & self . output_schema
242+ }
243+
244+ fn state ( & self ) -> Arc < ActionReconciliationIteratorState > {
245+ self . inner . state ( )
246+ }
247+ }
248+
249+ impl Iterator for TransformingCheckpointIterator {
250+ type Item = DeltaResult < FilteredEngineData > ;
251+
252+ fn next ( & mut self ) -> Option < Self :: Item > {
253+ let batch = self . inner . next ( ) ?;
254+
255+ // Apply the transform to the batch
256+ Some ( batch. and_then ( |filtered_data| {
257+ let ( engine_data, selection_vector) = filtered_data. into_parts ( ) ;
258+ let transformed = self . evaluator . evaluate ( engine_data. as_ref ( ) ) ?;
259+ FilteredEngineData :: try_new ( transformed, selection_vector)
260+ } ) )
261+ }
262+ }
263+
156264/// Orchestrates the process of creating a checkpoint for a table.
157265///
158266/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
@@ -218,50 +326,91 @@ impl CheckpointWriter {
218326 }
219327 /// Returns the checkpoint data to be written to the checkpoint file.
220328 ///
221- /// This method reads the actions from the log segment and processes them
222- /// to create the checkpoint data.
329+ /// This method reads actions from the log segment, processes them for checkpoint creation,
330+ /// and applies stats transforms based on table properties:
331+ /// - `delta.checkpoint.writeStatsAsJson` (default: true)
332+ /// - `delta.checkpoint.writeStatsAsStruct` (default: false)
223333 ///
224- /// # Parameters
225- /// - `engine`: Implementation of [`Engine`] APIs.
334+ /// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats
335+ /// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the
336+ /// schema for writing the checkpoint file.
226337 ///
227- /// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
228- // This method is the core of the checkpoint generation process. It:
229- // 1. Determines whether to write a V1 or V2 checkpoint based on the table's
230- // `v2Checkpoints` feature support
231- // 2. Reads actions from the log segment using the checkpoint read schema
232- // 3. Filters and deduplicates actions for the checkpoint
233- // 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
234- // (i.e., if `v2Checkpoints` feature is supported by table)
235- // 5. Generates the appropriate checkpoint path
236- pub fn checkpoint_data (
237- & self ,
238- engine : & dyn Engine ,
239- ) -> DeltaResult < ActionReconciliationIterator > {
338+ /// # Engine Usage
339+ ///
340+ /// ```ignore
341+ /// let mut checkpoint_data = writer.checkpoint_data(&engine)?;
342+ /// let output_schema = checkpoint_data.output_schema().clone();
343+ /// while let Some(batch) = checkpoint_data.next() {
344+ /// let data = batch?.apply_selection_vector()?;
345+ /// parquet_writer.write(&data, &output_schema).await?;
346+ /// }
347+ /// writer.finalize(&engine, &metadata, checkpoint_data)?;
348+ /// ```
349+ pub fn checkpoint_data ( & self , engine : & dyn Engine ) -> DeltaResult < impl CheckpointDataIterator > {
350+ let config = StatsTransformConfig :: from_table_properties ( self . snapshot . table_properties ( ) ) ;
351+
352+ // Get stats schema from table configuration.
353+ // This already excludes partition columns and applies column mapping.
354+ let stats_schema = self
355+ . snapshot
356+ . table_configuration ( )
357+ . expected_stats_schema ( ) ?;
358+
359+ // Select schema based on V2 checkpoint support
240360 let is_v2_checkpoints_supported = self
241361 . snapshot
242362 . table_configuration ( )
243363 . is_feature_supported ( & TableFeature :: V2Checkpoint ) ;
244364
245- let actions = self . snapshot . log_segment ( ) . read_actions (
246- engine,
247- CHECKPOINT_ACTIONS_SCHEMA . clone ( ) ,
248- None ,
249- ) ?;
365+ let base_schema = if is_v2_checkpoints_supported {
366+ & CHECKPOINT_ACTIONS_SCHEMA_V2
367+ } else {
368+ & CHECKPOINT_ACTIONS_SCHEMA_V1
369+ } ;
370+
371+ // Read schema includes stats_parsed so COALESCE expressions can operate on it.
372+ // For commits, stats_parsed will be read as nulls (column doesn't exist in source).
373+ let read_schema = build_checkpoint_read_schema_with_stats ( base_schema, & stats_schema) ?;
250374
251- // Create iterator over actions for checkpoint data
375+ // Read actions from log segment
376+ let actions =
377+ self . snapshot
378+ . log_segment ( )
379+ . read_actions ( engine, read_schema. clone ( ) , None ) ?;
380+
381+ // Process actions through reconciliation
252382 let checkpoint_data = ActionReconciliationProcessor :: new (
253383 self . deleted_file_retention_timestamp ( ) ?,
254384 self . get_transaction_expiration_timestamp ( ) ?,
255385 )
256386 . process_actions_iter ( actions) ;
257387
258- let checkpoint_metadata =
259- is_v2_checkpoints_supported. then ( || self . create_checkpoint_metadata_batch ( engine) ) ;
388+ // Build output schema based on stats config (determines which fields are included)
389+ let output_schema = build_checkpoint_output_schema ( & config, base_schema, & stats_schema) ?;
390+
391+ // Build transform expression and create expression evaluator
392+ let transform_expr = build_stats_transform ( & config, stats_schema) ;
393+ let evaluator = engine. evaluation_handler ( ) . new_expression_evaluator (
394+ read_schema. clone ( ) ,
395+ transform_expr,
396+ output_schema. clone ( ) . into ( ) ,
397+ ) ?;
398+
399+ // For V2 checkpoints, chain the checkpoint metadata batch to the action stream.
400+ // The checkpoint metadata batch uses the read schema (with stats_parsed), so it can
401+ // go through the same stats transform pipeline.
402+ let checkpoint_metadata = is_v2_checkpoints_supported
403+ . then ( || self . create_checkpoint_metadata_batch ( engine, & read_schema) ) ;
260404
261- // Wrap the iterator to track action counts
262- Ok ( ActionReconciliationIterator :: new ( Box :: new (
263- checkpoint_data. chain ( checkpoint_metadata) ,
264- ) ) )
405+ // Create action reconciliation iterator, chaining checkpoint metadata for V2
406+ let inner =
407+ ActionReconciliationIterator :: new ( Box :: new ( checkpoint_data. chain ( checkpoint_metadata) ) ) ;
408+
409+ Ok ( TransformingCheckpointIterator :: new (
410+ inner,
411+ evaluator,
412+ output_schema,
413+ ) )
265414 }
266415
267416 /// Finalizes checkpoint creation by saving metadata about the checkpoint.
@@ -330,23 +479,49 @@ impl CheckpointWriter {
330479 ///
331480 /// # Implementation Details
332481 ///
333- /// The function creates a single-row [`EngineData`] batch containing only the
334- /// version field of the [`CheckpointMetadata`] action. Future implementations will
335- /// include the additional metadata field `tags` when map support is added.
482+ /// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint
483+ /// schema, with all action fields (add, remove, etc.) set to null except for the
484+ /// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same
485+ /// schema as other action batches, allowing them to be written to the same Parquet file.
336486 ///
337487 /// # Returns:
338488 /// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with
339489 /// an accompanying selection vector with a single `true` value, indicating the action in
340490 /// batch should be included in the checkpoint.
491+ /// Creates the checkpoint metadata batch with the given schema.
492+ ///
493+ /// The schema must be the read schema (with stats_parsed) so the batch can go through
494+ /// the same stats transform pipeline as regular action batches.
341495 fn create_checkpoint_metadata_batch (
342496 & self ,
343497 engine : & dyn Engine ,
498+ schema : & SchemaRef ,
344499 ) -> DeltaResult < ActionReconciliationBatch > {
345- let checkpoint_metadata_batch = engine. evaluation_handler ( ) . create_one (
346- CHECKPOINT_METADATA_ACTION_SCHEMA . clone ( ) ,
347- & [ Scalar :: from ( self . version ) ] ,
500+ use crate :: expressions:: { Expression , StructData , Transform } ;
501+
502+ // Start with an all-null row
503+ let null_row = engine. evaluation_handler ( ) . null_row ( schema. clone ( ) ) ?;
504+
505+ // Build the checkpointMetadata struct value
506+ let checkpoint_metadata_value = Scalar :: Struct ( StructData :: try_new (
507+ vec ! [ StructField :: not_null( "version" , DataType :: LONG ) ] ,
508+ vec ! [ Scalar :: from( self . version) ] ,
509+ ) ?) ;
510+
511+ // Use a Transform to set just the checkpointMetadata field, keeping others null
512+ let transform = Transform :: new_top_level ( ) . with_replaced_field (
513+ CHECKPOINT_METADATA_NAME ,
514+ Arc :: new ( Expression :: literal ( checkpoint_metadata_value) ) ,
515+ ) ;
516+
517+ let evaluator = engine. evaluation_handler ( ) . new_expression_evaluator (
518+ schema. clone ( ) ,
519+ Arc :: new ( Expression :: transform ( transform) ) ,
520+ schema. clone ( ) . into ( ) ,
348521 ) ?;
349522
523+ let checkpoint_metadata_batch = evaluator. evaluate ( null_row. as_ref ( ) ) ?;
524+
350525 let filtered_data = FilteredEngineData :: with_all_rows_selected ( checkpoint_metadata_batch) ;
351526
352527 Ok ( ActionReconciliationBatch {
0 commit comments