diff --git a/kernel/examples/checkpoint-table/src/main.rs b/kernel/examples/checkpoint-table/src/main.rs index 9567179461..5b1a632624 100644 --- a/kernel/examples/checkpoint-table/src/main.rs +++ b/kernel/examples/checkpoint-table/src/main.rs @@ -7,9 +7,10 @@ use futures::future::{BoxFuture, FutureExt}; use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter}; use parquet::arrow::AsyncArrowWriter; +use delta_kernel::checkpoint::CheckpointDataIterator; use delta_kernel::engine::arrow_data::EngineDataArrowExt; use delta_kernel::engine::default::DefaultEngineBuilder; -use delta_kernel::{ActionReconciliationIterator, DeltaResult, Error, FileMeta, Snapshot}; +use delta_kernel::{DeltaResult, Error, FileMeta, Snapshot}; /// An example program that checkpoints a table. /// !!!WARNING!!!: This doesn't use put-if-absent, or a catalog based commit, so it is UNSAFE. @@ -44,7 +45,7 @@ async fn main() -> ExitCode { async fn write_data( first_batch: &RecordBatch, - batch_iter: &mut ActionReconciliationIterator, + batch_iter: &mut impl CheckpointDataIterator, parquet_writer: &mut AsyncArrowWriter, ) -> DeltaResult<()> { parquet_writer.write(first_batch).await?; diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index 56c78ac327..5340bb84b4 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -16,7 +16,7 @@ //! ## Architecture //! //! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow -//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written +//! - [`CheckpointDataIterator`] - Trait for iterators over checkpoint data to be written //! //! ## Usage //! @@ -33,6 +33,7 @@ //! # use std::sync::Arc; //! # use delta_kernel::ActionReconciliationIterator; //! # use delta_kernel::checkpoint::CheckpointWriter; +//! # use delta_kernel::checkpoint::CheckpointDataIterator; //! # use delta_kernel::Engine; //! # use delta_kernel::Snapshot; //! # use delta_kernel::SnapshotRef; @@ -40,7 +41,7 @@ //! # use delta_kernel::Error; //! # use delta_kernel::FileMeta; //! # use url::Url; -//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult { +//! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator) -> DeltaResult { //! todo!() /* engine-specific logic to write data to object storage*/ //! } //! @@ -112,6 +113,13 @@ use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension, use url::Url; +mod stats_transform; + +use stats_transform::{ + build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform, + StatsTransformConfig, +}; + #[cfg(test)] mod tests; @@ -129,8 +137,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock = LazyLock::new(|| { .into() }); -/// Schema for extracting relevant actions from log files for checkpoint creation -static CHECKPOINT_ACTIONS_SCHEMA: LazyLock = LazyLock::new(|| { +/// Schema for V1 checkpoints (without checkpointMetadata action) +static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked([ StructField::nullable(ADD_NAME, Add::to_schema()), StructField::nullable(REMOVE_NAME, Remove::to_schema()), @@ -141,16 +149,118 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock = LazyLock::new(|| { ])) }); -// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints -// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which -// we're not supporting yet due to the lack of map support TODO(#880). -static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock = LazyLock::new(|| { - Arc::new(StructType::new_unchecked([StructField::nullable( +/// Schema for the checkpointMetadata field in V2 checkpoints. +/// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which +/// we're not supporting yet due to the lack of map support TODO(#880). +fn checkpoint_metadata_field() -> StructField { + StructField::nullable( CHECKPOINT_METADATA_NAME, DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]), - )])) + ) +} + +/// Schema for V2 checkpoints (includes checkpointMetadata action) +static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new_unchecked([ + StructField::nullable(ADD_NAME, Add::to_schema()), + StructField::nullable(REMOVE_NAME, Remove::to_schema()), + StructField::nullable(METADATA_NAME, Metadata::to_schema()), + StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()), + StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()), + StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()), + checkpoint_metadata_field(), + ])) }); +/// Trait for iterators that yield checkpoint data batches. +/// +/// This trait abstracts over checkpoint data iterators, allowing the concrete implementation +/// to change without breaking the public API. Implementations yield [`FilteredEngineData`] +/// batches that should be written to the checkpoint file. +/// +/// # Yielded Data +/// +/// All batches conform to [`output_schema()`][Self::output_schema], which is determined by: +/// - **V1 checkpoints**: Schema includes add, remove, metadata, protocol, txn, sidecar +/// - **V2 checkpoints**: Same as V1 plus checkpointMetadata +/// +/// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table +/// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`). +/// +/// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other +/// action fields set to null. +pub trait CheckpointDataIterator: Iterator> { + /// Returns the schema for writing checkpoint data. + /// + /// All batches from this iterator conform to this schema. The schema reflects: + /// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field) + /// - Stats configuration (`stats` and/or `stats_parsed` fields) + fn output_schema(&self) -> &SchemaRef; + + /// Returns the shared iterator state for tracking counts and exhaustion. + /// + /// This state should be passed to [`CheckpointWriter::finalize`] after the iterator + /// has been fully consumed. + fn state(&self) -> Arc; +} + +/// Iterator that applies stats transforms to checkpoint data batches. +/// +/// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an +/// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch +/// to populate stats fields. +/// +/// All batches (including the checkpoint metadata batch for V2 checkpoints) share the +/// same schema and go through the same transform pipeline. The stats transform only +/// operates on the `add` field, so other fields (including `checkpointMetadata`) pass +/// through unchanged. +pub struct TransformingCheckpointIterator { + inner: ActionReconciliationIterator, + evaluator: Arc, + /// Schema for writing checkpoint data (includes/excludes stats fields based on config) + output_schema: SchemaRef, +} + +impl TransformingCheckpointIterator { + /// Creates a new transforming iterator. + pub(crate) fn new( + inner: ActionReconciliationIterator, + evaluator: Arc, + output_schema: SchemaRef, + ) -> Self { + Self { + inner, + evaluator, + output_schema, + } + } +} + +impl CheckpointDataIterator for TransformingCheckpointIterator { + fn output_schema(&self) -> &SchemaRef { + &self.output_schema + } + + fn state(&self) -> Arc { + self.inner.state() + } +} + +impl Iterator for TransformingCheckpointIterator { + type Item = DeltaResult; + + fn next(&mut self) -> Option { + let batch = self.inner.next()?; + + // Apply the transform to the batch + Some(batch.and_then(|filtered_data| { + let (engine_data, selection_vector) = filtered_data.into_parts(); + let transformed = self.evaluator.evaluate(engine_data.as_ref())?; + FilteredEngineData::try_new(transformed, selection_vector) + })) + } +} + /// Orchestrates the process of creating a checkpoint for a table. /// /// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table. @@ -216,50 +326,91 @@ impl CheckpointWriter { } /// Returns the checkpoint data to be written to the checkpoint file. /// - /// This method reads the actions from the log segment and processes them - /// to create the checkpoint data. + /// This method reads actions from the log segment, processes them for checkpoint creation, + /// and applies stats transforms based on table properties: + /// - `delta.checkpoint.writeStatsAsJson` (default: true) + /// - `delta.checkpoint.writeStatsAsStruct` (default: false) /// - /// # Parameters - /// - `engine`: Implementation of [`Engine`] APIs. + /// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats + /// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the + /// schema for writing the checkpoint file. /// - /// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data - // This method is the core of the checkpoint generation process. It: - // 1. Determines whether to write a V1 or V2 checkpoint based on the table's - // `v2Checkpoints` feature support - // 2. Reads actions from the log segment using the checkpoint read schema - // 3. Filters and deduplicates actions for the checkpoint - // 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint - // (i.e., if `v2Checkpoints` feature is supported by table) - // 5. Generates the appropriate checkpoint path - pub fn checkpoint_data( - &self, - engine: &dyn Engine, - ) -> DeltaResult { + /// # Engine Usage + /// + /// ```ignore + /// let mut checkpoint_data = writer.checkpoint_data(&engine)?; + /// let output_schema = checkpoint_data.output_schema().clone(); + /// while let Some(batch) = checkpoint_data.next() { + /// let data = batch?.apply_selection_vector()?; + /// parquet_writer.write(&data, &output_schema).await?; + /// } + /// writer.finalize(&engine, &metadata, checkpoint_data)?; + /// ``` + pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult { + let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties()); + + // Get stats schema from table configuration. + // This already excludes partition columns and applies column mapping. + let stats_schema = self + .snapshot + .table_configuration() + .expected_stats_schema()?; + + // Select schema based on V2 checkpoint support let is_v2_checkpoints_supported = self .snapshot .table_configuration() .is_feature_supported(&TableFeature::V2Checkpoint); - let actions = self.snapshot.log_segment().read_actions( - engine, - CHECKPOINT_ACTIONS_SCHEMA.clone(), - None, - )?; + let base_schema = if is_v2_checkpoints_supported { + &CHECKPOINT_ACTIONS_SCHEMA_V2 + } else { + &CHECKPOINT_ACTIONS_SCHEMA_V1 + }; + + // Read schema includes stats_parsed so COALESCE expressions can operate on it. + // For commits, stats_parsed will be read as nulls (column doesn't exist in source). + let read_schema = build_checkpoint_read_schema_with_stats(base_schema, &stats_schema)?; - // Create iterator over actions for checkpoint data + // Read actions from log segment + let actions = + self.snapshot + .log_segment() + .read_actions(engine, read_schema.clone(), None)?; + + // Process actions through reconciliation let checkpoint_data = ActionReconciliationProcessor::new( self.deleted_file_retention_timestamp()?, self.get_transaction_expiration_timestamp()?, ) .process_actions_iter(actions); - let checkpoint_metadata = - is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine)); + // Build output schema based on stats config (determines which fields are included) + let output_schema = build_checkpoint_output_schema(&config, base_schema, &stats_schema)?; + + // Build transform expression and create expression evaluator + let transform_expr = build_stats_transform(&config, stats_schema); + let evaluator = engine.evaluation_handler().new_expression_evaluator( + read_schema.clone(), + transform_expr, + output_schema.clone().into(), + )?; + + // For V2 checkpoints, chain the checkpoint metadata batch to the action stream. + // The checkpoint metadata batch uses the read schema (with stats_parsed), so it can + // go through the same stats transform pipeline. + let checkpoint_metadata = is_v2_checkpoints_supported + .then(|| self.create_checkpoint_metadata_batch(engine, &read_schema)); - // Wrap the iterator to track action counts - Ok(ActionReconciliationIterator::new(Box::new( - checkpoint_data.chain(checkpoint_metadata), - ))) + // Create action reconciliation iterator, chaining checkpoint metadata for V2 + let inner = + ActionReconciliationIterator::new(Box::new(checkpoint_data.chain(checkpoint_metadata))); + + Ok(TransformingCheckpointIterator::new( + inner, + evaluator, + output_schema, + )) } /// Finalizes checkpoint creation by saving metadata about the checkpoint. @@ -328,23 +479,49 @@ impl CheckpointWriter { /// /// # Implementation Details /// - /// The function creates a single-row [`EngineData`] batch containing only the - /// version field of the [`CheckpointMetadata`] action. Future implementations will - /// include the additional metadata field `tags` when map support is added. + /// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint + /// schema, with all action fields (add, remove, etc.) set to null except for the + /// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same + /// schema as other action batches, allowing them to be written to the same Parquet file. /// /// # Returns: /// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with /// an accompanying selection vector with a single `true` value, indicating the action in /// batch should be included in the checkpoint. + /// Creates the checkpoint metadata batch with the given schema. + /// + /// The schema must be the read schema (with stats_parsed) so the batch can go through + /// the same stats transform pipeline as regular action batches. fn create_checkpoint_metadata_batch( &self, engine: &dyn Engine, + schema: &SchemaRef, ) -> DeltaResult { - let checkpoint_metadata_batch = engine.evaluation_handler().create_one( - CHECKPOINT_METADATA_ACTION_SCHEMA.clone(), - &[Scalar::from(self.version)], + use crate::expressions::{Expression, StructData, Transform}; + + // Start with an all-null row + let null_row = engine.evaluation_handler().null_row(schema.clone())?; + + // Build the checkpointMetadata struct value + let checkpoint_metadata_value = Scalar::Struct(StructData::try_new( + vec![StructField::not_null("version", DataType::LONG)], + vec![Scalar::from(self.version)], + )?); + + // Use a Transform to set just the checkpointMetadata field, keeping others null + let transform = Transform::new_top_level().with_replaced_field( + CHECKPOINT_METADATA_NAME, + Arc::new(Expression::literal(checkpoint_metadata_value)), + ); + + let evaluator = engine.evaluation_handler().new_expression_evaluator( + schema.clone(), + Arc::new(Expression::transform(transform)), + schema.clone().into(), )?; + let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?; + let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch); Ok(ActionReconciliationBatch { diff --git a/kernel/src/checkpoint/stats_transform.rs b/kernel/src/checkpoint/stats_transform.rs new file mode 100644 index 0000000000..023a124c76 --- /dev/null +++ b/kernel/src/checkpoint/stats_transform.rs @@ -0,0 +1,452 @@ +//! Transforms for populating stats_parsed and stats fields in checkpoint data. +//! +//! This module ensures that Add actions in checkpoints have the correct statistics format +//! based on the table configuration. Statistics can be stored in two formats as fields on +//! the `Add` action: +//! - `stats`: JSON string format, controlled by `delta.checkpoint.writeStatsAsJson` (default: true) +//! - `stats_parsed`: Native struct format, controlled by `delta.checkpoint.writeStatsAsStruct` (default: true) +//! +//! This module provides transforms to populate these fields using COALESCE expressions, +//! ensuring that stats are preserved regardless of the source format (commits vs checkpoints). + +use std::sync::{Arc, LazyLock}; + +use crate::actions::ADD_NAME; +use crate::expressions::{Expression, ExpressionRef, Transform, UnaryExpressionOp}; +use crate::schema::{DataType, SchemaRef, StructField, StructType}; +use crate::table_properties::TableProperties; +use crate::{DeltaResult, Error}; + +pub(crate) const STATS_FIELD: &str = "stats"; +pub(crate) const STATS_PARSED_FIELD: &str = "stats_parsed"; + +/// Configuration for stats transformation based on table properties. +#[derive(Debug, Clone, Copy)] +pub(crate) struct StatsTransformConfig { + pub write_stats_as_json: bool, + pub write_stats_as_struct: bool, +} + +impl StatsTransformConfig { + pub(super) fn from_table_properties(properties: &TableProperties) -> Self { + Self { + write_stats_as_json: properties.checkpoint_write_stats_as_json.unwrap_or(true), + write_stats_as_struct: properties.checkpoint_write_stats_as_struct.unwrap_or(true), + } + } +} + +/// Builds a transform for the Add action to populate and/or drop stats fields. +/// +/// The transform handles all four scenarios based on table properties: +/// - When `writeStatsAsJson=true`: `stats = COALESCE(stats, ToJson(stats_parsed))` +/// - When `writeStatsAsJson=false`: drop `stats` field +/// - When `writeStatsAsStruct=true`: `stats_parsed = COALESCE(stats_parsed, ParseJson(stats))` +/// - When `writeStatsAsStruct=false`: drop `stats_parsed` field +/// +/// Returns a top-level transform that wraps the nested Add transform, ensuring the +/// full checkpoint batch is produced with the modified Add action. +/// +/// # Arguments +/// +/// * `stats_schema` - The expected schema for parsed file statistics, typically generated +/// by [`expected_stats_schema`]. This schema has the following structure: +/// ```ignore +/// { +/// numRecords: long, +/// nullCount: , +/// minValues: , +/// maxValues: , +/// } +/// ``` +/// The schema is derived from the table's physical file schema and table properties +/// (`dataSkippingNumIndexedCols`, `dataSkippingStatsColumns`). Only columns eligible +/// for data skipping are included in `minValues`/`maxValues`. +/// +/// [`expected_stats_schema`]: crate::scan::data_skipping::stats_schema::expected_stats_schema +pub(crate) fn build_stats_transform( + config: &StatsTransformConfig, + stats_schema: SchemaRef, +) -> ExpressionRef { + let mut add_transform = Transform::new_nested([ADD_NAME]); + + // Handle stats field + if config.write_stats_as_json { + // Populate stats from stats_parsed if needed (for old checkpoints that only had stats_parsed) + add_transform = add_transform.with_replaced_field(STATS_FIELD, STATS_JSON_EXPR.clone()); + } else { + // Drop stats field when not writing as JSON + add_transform = add_transform.with_dropped_field(STATS_FIELD); + } + + // Handle stats_parsed field + // Note: stats_parsed was added to read schema (via build_checkpoint_read_schema_with_stats), + // so we always need to either replace it (with COALESCE) or drop it. + if config.write_stats_as_struct { + // Populate stats_parsed from JSON stats (for commits that only have JSON stats) + let stats_parsed_expr = build_stats_parsed_expr(stats_schema); + add_transform = add_transform.with_replaced_field(STATS_PARSED_FIELD, stats_parsed_expr); + } else { + // Drop stats_parsed field when not writing as struct + add_transform = add_transform.with_dropped_field(STATS_PARSED_FIELD); + } + + // Wrap the nested Add transform in a top-level transform that replaces the Add field + let add_transform_expr: ExpressionRef = Arc::new(Expression::transform(add_transform)); + let outer_transform = + Transform::new_top_level().with_replaced_field(ADD_NAME, add_transform_expr); + + Arc::new(Expression::transform(outer_transform)) +} + +/// Builds expression: `stats_parsed = COALESCE(stats_parsed, ParseJson(stats, schema))` +/// +/// This expression prefers existing stats_parsed, falling back to parsing JSON stats. +/// Column paths are relative to the full batch (not the nested Add struct), so we use +/// ["add", "stats"] instead of just ["stats"]. +fn build_stats_parsed_expr(stats_schema: SchemaRef) -> ExpressionRef { + Arc::new(Expression::coalesce([ + Expression::column([ADD_NAME, STATS_PARSED_FIELD]), + Expression::parse_json(Expression::column([ADD_NAME, STATS_FIELD]), stats_schema), + ])) +} + +/// Static expression: `stats = COALESCE(stats, ToJson(stats_parsed))` +/// +/// This expression prefers existing JSON stats, falling back to converting stats_parsed. +/// Column paths are relative to the full batch (not the nested Add struct), so we use +/// ["add", "stats"] instead of just ["stats"]. +static STATS_JSON_EXPR: LazyLock = LazyLock::new(|| { + Arc::new(Expression::coalesce([ + Expression::column([ADD_NAME, STATS_FIELD]), + Expression::unary( + UnaryExpressionOp::ToJson, + Expression::column([ADD_NAME, STATS_PARSED_FIELD]), + ), + ])) +}); + +/// Transforms the Add action schema within a checkpoint schema. +/// +/// This helper applies a transformation function to the Add struct and returns +/// a new schema with the modified Add field. +/// +// TODO: Consider adding StructType helper methods (e.g., with_field_inserted, +// with_field_removed) to reduce boilerplate. Schemas are Arc-shared so cloning +// is necessary, but dedicated methods would be more ergonomic. +/// +/// # Errors +/// +/// Returns an error if: +/// - The `add` field is not found in the schema +/// - The `add` field is not a struct type +fn transform_add_schema( + base_schema: &StructType, + transform_fn: impl FnOnce(&StructType) -> DeltaResult, +) -> DeltaResult { + // Find and validate the add field + let add_field = base_schema + .field(ADD_NAME) + .ok_or_else(|| Error::generic("Expected 'add' field in checkpoint schema"))?; + + let DataType::Struct(add_struct) = &add_field.data_type else { + return Err(Error::generic(format!( + "Expected 'add' field to be a struct type, got {:?}", + add_field.data_type + ))); + }; + + let modified_add = transform_fn(add_struct)?; + let fields: Vec = base_schema + .fields() + .map(|field| { + if field.name == ADD_NAME { + StructField { + name: field.name.clone(), + data_type: DataType::Struct(Box::new(modified_add.clone())), + nullable: field.nullable, + metadata: field.metadata.clone(), + } + } else { + field.clone() + } + }) + .collect(); + + Ok(Arc::new(StructType::new_unchecked(fields))) +} + +/// Builds a read schema that includes `stats_parsed` in the Add action. +/// +/// The read schema must include `stats_parsed` for ALL reads (checkpoints + commits) +/// even though commits don't have `stats_parsed`. This ensures the column exists +/// (as nulls) so COALESCE can operate correctly. +/// +/// # Errors +/// +/// Returns an error if: +/// - The `add` field is not found or is not a struct type +/// - The `stats_parsed` field already exists in the Add schema +pub(crate) fn build_checkpoint_read_schema_with_stats( + base_schema: &StructType, + stats_schema: &StructType, +) -> DeltaResult { + transform_add_schema(base_schema, |add_struct| { + // Validate stats_parsed isn't already present + if add_struct.field(STATS_PARSED_FIELD).is_some() { + return Err(Error::generic( + "stats_parsed field already exists in Add schema", + )); + } + Ok(add_stats_parsed_to_add_schema(add_struct, stats_schema)) + }) +} + +/// Adds `stats_parsed` field after `stats` in the Add action schema. +fn add_stats_parsed_to_add_schema( + add_schema: &StructType, + stats_schema: &StructType, +) -> StructType { + let mut fields: Vec = Vec::with_capacity(add_schema.num_fields() + 1); + + for field in add_schema.fields() { + fields.push(field.clone()); + if field.name == STATS_FIELD { + // Insert stats_parsed right after stats + fields.push(StructField::nullable( + STATS_PARSED_FIELD, + DataType::Struct(Box::new(stats_schema.clone())), + )); + } + } + + StructType::new_unchecked(fields) +} + +/// Builds the output schema based on configuration. +/// +/// The output schema determines which fields are included in the checkpoint: +/// - If `writeStatsAsJson=false`: `stats` field is excluded +/// - If `writeStatsAsStruct=true`: `stats_parsed` field is included +/// +/// # Errors +/// +/// Returns an error if the `add` field is not found or is not a struct type. +pub(crate) fn build_checkpoint_output_schema( + config: &StatsTransformConfig, + base_schema: &StructType, + stats_schema: &StructType, +) -> DeltaResult { + transform_add_schema(base_schema, |add_struct| { + Ok(build_add_output_schema(config, add_struct, stats_schema)) + }) +} + +fn build_add_output_schema( + config: &StatsTransformConfig, + add_schema: &StructType, + stats_schema: &StructType, +) -> StructType { + let capacity = add_schema.num_fields() + - if config.write_stats_as_json { 0 } else { 1 } // dropping stats? + + if config.write_stats_as_struct { 1 } else { 0 }; // adding stats_parsed? + let mut fields: Vec = Vec::with_capacity(capacity); + + for field in add_schema.fields() { + if field.name == STATS_FIELD { + // Include stats if writing as JSON + if config.write_stats_as_json { + fields.push(field.clone()); + } + // Add stats_parsed after stats position if writing as struct + if config.write_stats_as_struct { + fields.push(StructField::nullable( + STATS_PARSED_FIELD, + DataType::Struct(Box::new(stats_schema.clone())), + )); + } + } else { + fields.push(field.clone()); + } + } + + StructType::new_unchecked(fields) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_config_defaults() { + // Default: writeStatsAsJson=true, writeStatsAsStruct=true + let props = TableProperties::default(); + let config = StatsTransformConfig::from_table_properties(&props); + assert!(config.write_stats_as_json); + assert!(config.write_stats_as_struct); + } + + #[test] + fn test_config_with_struct_enabled() { + let props = TableProperties { + checkpoint_write_stats_as_struct: Some(true), + ..Default::default() + }; + let config = StatsTransformConfig::from_table_properties(&props); + assert!(config.write_stats_as_json); + assert!(config.write_stats_as_struct); + } + + #[test] + fn test_build_transform_with_json_only() { + // writeStatsAsJson=true, writeStatsAsStruct=false (default) + // Should produce a transform expression that replaces stats with COALESCE and drops stats_parsed + let config = StatsTransformConfig { + write_stats_as_json: true, + write_stats_as_struct: false, + }; + let stats_schema = Arc::new(StructType::new_unchecked([])); + let transform_expr = build_stats_transform(&config, stats_schema); + + // Verify we get a Transform expression + let Expression::Transform(_) = transform_expr.as_ref() else { + panic!("Expected Transform expression"); + }; + } + + #[test] + fn test_build_transform_drops_both_when_false() { + // writeStatsAsJson=false, writeStatsAsStruct=false + // Should produce a transform expression that drops both stats and stats_parsed + let config = StatsTransformConfig { + write_stats_as_json: false, + write_stats_as_struct: false, + }; + let stats_schema = Arc::new(StructType::new_unchecked([])); + let transform_expr = build_stats_transform(&config, stats_schema); + + // Verify we get a Transform expression + let Expression::Transform(_) = transform_expr.as_ref() else { + panic!("Expected Transform expression"); + }; + } + + #[test] + fn test_build_transform_with_both_enabled() { + // writeStatsAsJson=true, writeStatsAsStruct=true + // Should produce a transform expression that populates both stats and stats_parsed + let config = StatsTransformConfig { + write_stats_as_json: true, + write_stats_as_struct: true, + }; + let stats_schema = Arc::new(StructType::new_unchecked([])); + let transform_expr = build_stats_transform(&config, stats_schema); + + // Verify we get a Transform expression + let Expression::Transform(_) = transform_expr.as_ref() else { + panic!("Expected Transform expression"); + }; + } + + #[test] + fn test_build_transform_struct_only() { + // writeStatsAsJson=false, writeStatsAsStruct=true + // Should produce a transform expression that drops stats and populates stats_parsed + let config = StatsTransformConfig { + write_stats_as_json: false, + write_stats_as_struct: true, + }; + let stats_schema = Arc::new(StructType::new_unchecked([])); + let transform_expr = build_stats_transform(&config, stats_schema); + + // Verify we get a Transform expression + let Expression::Transform(_) = transform_expr.as_ref() else { + panic!("Expected Transform expression"); + }; + } + + #[test] + fn test_add_stats_parsed_to_add_schema() { + let add_schema = StructType::new_unchecked([ + StructField::not_null("path", DataType::STRING), + StructField::nullable("stats", DataType::STRING), + StructField::nullable("tags", DataType::STRING), + ]); + + let stats_schema = + StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]); + + let result = add_stats_parsed_to_add_schema(&add_schema, &stats_schema); + + // Should have 4 fields: path, stats, stats_parsed, tags + assert_eq!(result.fields().count(), 4); + + let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect(); + assert_eq!(field_names, vec!["path", "stats", "stats_parsed", "tags"]); + } + + #[test] + fn test_build_add_output_schema_json_only() { + let config = StatsTransformConfig { + write_stats_as_json: true, + write_stats_as_struct: false, + }; + + let add_schema = StructType::new_unchecked([ + StructField::not_null("path", DataType::STRING), + StructField::nullable("stats", DataType::STRING), + ]); + + let stats_schema = StructType::new_unchecked([]); + + let result = build_add_output_schema(&config, &add_schema, &stats_schema); + + // Should have path and stats, no stats_parsed + let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect(); + assert_eq!(field_names, vec!["path", "stats"]); + } + + #[test] + fn test_build_add_output_schema_struct_only() { + let config = StatsTransformConfig { + write_stats_as_json: false, + write_stats_as_struct: true, + }; + + let add_schema = StructType::new_unchecked([ + StructField::not_null("path", DataType::STRING), + StructField::nullable("stats", DataType::STRING), + ]); + + let stats_schema = + StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]); + + let result = build_add_output_schema(&config, &add_schema, &stats_schema); + + // Should have path and stats_parsed (stats dropped) + let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect(); + assert_eq!(field_names, vec!["path", "stats_parsed"]); + } + + #[test] + fn test_build_add_output_schema_both() { + let config = StatsTransformConfig { + write_stats_as_json: true, + write_stats_as_struct: true, + }; + + let add_schema = StructType::new_unchecked([ + StructField::not_null("path", DataType::STRING), + StructField::nullable("stats", DataType::STRING), + ]); + + let stats_schema = + StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]); + + let result = build_add_output_schema(&config, &add_schema, &stats_schema); + + // Should have path, stats, and stats_parsed + let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect(); + assert_eq!(field_names, vec!["path", "stats", "stats_parsed"]); + } +} diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs index 3e0aac9b7c..89d74b1200 100644 --- a/kernel/src/checkpoint/tests.rs +++ b/kernel/src/checkpoint/tests.rs @@ -4,14 +4,13 @@ use crate::action_reconciliation::{ deleted_file_retention_timestamp_with_time, DEFAULT_RETENTION_SECS, }; use crate::actions::{Add, Metadata, Protocol, Remove}; -use crate::arrow::array::{ArrayRef, StructArray}; use crate::arrow::datatypes::{DataType, Schema}; use crate::arrow::{ array::{create_array, RecordBatch}, datatypes::Field, }; -use crate::checkpoint::create_last_checkpoint_data; -use crate::engine::arrow_data::ArrowEngineData; +use crate::checkpoint::{create_last_checkpoint_data, CheckpointDataIterator}; +use crate::engine::arrow_data::{ArrowEngineData, EngineDataArrowExt}; use crate::engine::default::executor::tokio::TokioMultiThreadExecutor; use crate::engine::default::DefaultEngineBuilder; use crate::log_replay::HasSelectionVector; @@ -59,11 +58,13 @@ fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { #[tokio::test] async fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { + use crate::checkpoint::CHECKPOINT_ACTIONS_SCHEMA_V2; + let (store, _) = new_in_memory_store(); let engine = DefaultEngineBuilder::new(store.clone()).build(); // 1st commit (version 0) - metadata and protocol actions - // Protocol action does not include the v2Checkpoint reader/writer feature. + // Protocol action includes the v2Checkpoint reader/writer feature. write_commit_to_store( &store, vec![ @@ -78,32 +79,35 @@ async fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { let snapshot = Snapshot::builder_for(table_root).build(&engine)?; let writer = snapshot.create_checkpoint_writer()?; - let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; + // Use V2 schema for the checkpoint metadata batch + let checkpoint_batch = + writer.create_checkpoint_metadata_batch(&engine, &CHECKPOINT_ACTIONS_SCHEMA_V2)?; assert!(checkpoint_batch.filtered_data.has_selected_rows()); - // Verify the underlying EngineData contains the expected CheckpointMetadata action + // Verify the underlying EngineData contains the expected fields let (underlying_data, _) = checkpoint_batch.filtered_data.into_parts(); let arrow_engine_data = ArrowEngineData::try_from_engine_data(underlying_data)?; let record_batch = arrow_engine_data.record_batch(); - // Build the expected RecordBatch - // Note: The schema is a struct with a single field "checkpointMetadata" of type struct - // containing a single field "version" of type long - let expected_schema = Arc::new(Schema::new(vec![Field::new( - "checkpointMetadata", - DataType::Struct(vec![Field::new("version", DataType::Int64, false)].into()), - true, - )])); - let expected = RecordBatch::try_new( - expected_schema, - vec![Arc::new(StructArray::from(vec![( - Arc::new(Field::new("version", DataType::Int64, false)), - create_array!(Int64, [0]) as ArrayRef, - )]))], - ) - .unwrap(); + // Verify the schema has the expected fields + let schema = record_batch.schema(); + assert!( + schema.field_with_name("checkpointMetadata").is_ok(), + "Schema should have checkpointMetadata field" + ); + assert!( + schema.field_with_name("add").is_ok(), + "Schema should have add field" + ); + assert!( + schema.field_with_name("remove").is_ok(), + "Schema should have remove field" + ); - assert_eq!(*record_batch, expected); + // Verify we have one row + assert_eq!(record_batch.num_rows(), 1); + + // Verify action counts assert_eq!(checkpoint_batch.actions_count, 1); assert_eq!(checkpoint_batch.add_actions_count, 0); @@ -215,7 +219,7 @@ fn create_metadata_action() -> Action { ) } -/// Create an Add action with the specified path +/// Create a simple Add action with the specified path (no stats) fn create_add_action(path: &str) -> Action { Action::Add(Add { path: path.into(), @@ -273,13 +277,18 @@ async fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { let engine = DefaultEngineBuilder::new(store.clone()).build(); // 1st commit: adds `fake_path_1` - write_commit_to_store(&store, vec![create_add_action("fake_path_1")], 0).await?; + write_commit_to_store( + &store, + vec![create_add_action_with_stats("fake_path_1", 10)], + 0, + ) + .await?; // 2nd commit: adds `fake_path_2` & removes `fake_path_1` write_commit_to_store( &store, vec![ - create_add_action("fake_path_2"), + create_add_action_with_stats("fake_path_2", 20), create_remove_action("fake_path_1"), ], 1, @@ -305,7 +314,8 @@ async fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { Url::parse("memory:///_delta_log/00000000000000000002.checkpoint.parquet")? ); - let mut data_iter = writer.checkpoint_data(&engine)?; + let result = writer.checkpoint_data(&engine)?; + let mut data_iter = result; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; assert_eq!(batch.selection_vector(), &[true, true]); @@ -357,8 +367,8 @@ async fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { write_commit_to_store( &store, vec![ - create_add_action("file1.parquet"), - create_add_action("file2.parquet"), + create_add_action_with_stats("file1.parquet", 100), + create_add_action_with_stats("file2.parquet", 200), ], 1, ) @@ -377,7 +387,8 @@ async fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { Url::parse("memory:///_delta_log/00000000000000000000.checkpoint.parquet")? ); - let mut data_iter = writer.checkpoint_data(&engine)?; + let result = writer.checkpoint_data(&engine)?; + let mut data_iter = result; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; assert_eq!(batch.selection_vector(), &[true, true]); @@ -455,7 +466,7 @@ async fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { write_commit_to_store( &store, vec![ - create_add_action("fake_path_2"), + create_add_action_with_stats("fake_path_2", 50), create_remove_action("fake_path_1"), ], 0, @@ -484,7 +495,8 @@ async fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { Url::parse("memory:///_delta_log/00000000000000000001.checkpoint.parquet")? ); - let mut data_iter = writer.checkpoint_data(&engine)?; + let result = writer.checkpoint_data(&engine)?; + let mut data_iter = result; // The first batch should be the metadata and protocol actions. let batch = data_iter.next().unwrap()?; assert_eq!(batch.selection_vector(), &[true, true]); @@ -520,6 +532,64 @@ async fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { Ok(()) } +/// Test that V2 checkpoint batches all have the same schema. +/// +/// This verifies that the checkpoint metadata batch has the same schema as +/// regular action batches, allowing them to be written to the same Parquet file. +#[tokio::test] +async fn test_v2_checkpoint_unified_schema() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngineBuilder::new(store.clone()).build(); + + // Create a V2 checkpoint enabled table + write_commit_to_store( + &store, + vec![ + create_v2_checkpoint_protocol_action(), + create_metadata_action(), + ], + 0, + ) + .await?; + + write_commit_to_store( + &store, + vec![create_add_action_with_stats("file1.parquet", 100)], + 1, + ) + .await?; + + let table_root = Url::parse("memory:///")?; + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let writer = snapshot.create_checkpoint_writer()?; + let data_iter = writer.checkpoint_data(&engine)?; + + // Get the expected schema from the iterator + let expected_schema = data_iter.output_schema().clone(); + + // Verify all batches have the same schema + for batch_result in data_iter { + let batch = batch_result?; + let data = batch.apply_selection_vector()?; + let record_batch = data.try_into_record_batch()?; + let batch_schema = record_batch.schema(); + + assert_eq!( + batch_schema.fields().len(), + expected_schema.fields().count(), + "All batches should have the same number of fields" + ); + } + + // Verify the schema includes checkpointMetadata for V2 + assert!( + expected_schema.field("checkpointMetadata").is_some(), + "V2 checkpoint schema should include checkpointMetadata field" + ); + + Ok(()) +} + #[tokio::test] async fn test_no_checkpoint_staged_commits() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); @@ -563,6 +633,43 @@ async fn test_no_checkpoint_staged_commits() -> DeltaResult<()> { Ok(()) } +/// Create a Metadata action with writeStatsAsStruct enabled +fn create_metadata_action_with_stats_struct() -> Action { + let mut config = HashMap::new(); + config.insert( + "delta.checkpoint.writeStatsAsStruct".to_string(), + "true".to_string(), + ); + Action::Metadata( + Metadata::try_new( + Some("test-table".into()), + None, + StructType::new_unchecked([ + StructField::nullable("id", KernelDataType::LONG), + StructField::nullable("name", KernelDataType::STRING), + ]), + vec![], + 0, + config, + ) + .unwrap(), + ) +} + +/// Create an Add action with JSON stats +fn create_add_action_with_stats(path: &str, num_records: i64) -> Action { + let stats = format!( + r#"{{"numRecords":{},"minValues":{{"id":1,"name":"alice"}},"maxValues":{{"id":100,"name":"zoe"}},"nullCount":{{"id":0,"name":5}}}}"#, + num_records + ); + Action::Add(Add { + path: path.into(), + data_change: true, + stats: Some(stats), + ..Default::default() + }) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_snapshot_checkpoint() -> DeltaResult<()> { let (store, _) = new_in_memory_store(); @@ -664,3 +771,342 @@ async fn test_snapshot_checkpoint() -> DeltaResult<()> { Ok(()) } + +/// Tests checkpoint_data with writeStatsAsStruct enabled. +/// Verifies that the output schema includes stats_parsed. +#[tokio::test] +async fn test_checkpoint_data_struct_enabled() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngineBuilder::new(store.clone()).build(); + + // 1st commit: protocol + metadata with writeStatsAsStruct=true + write_commit_to_store( + &store, + vec![ + create_basic_protocol_action(), + create_metadata_action_with_stats_struct(), + ], + 0, + ) + .await?; + + // 2nd commit: add actions with JSON stats + write_commit_to_store( + &store, + vec![ + create_add_action_with_stats("file1.parquet", 100), + create_add_action_with_stats("file2.parquet", 200), + ], + 1, + ) + .await?; + + let table_root = Url::parse("memory:///")?; + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let writer = snapshot.create_checkpoint_writer()?; + + // Call checkpoint_data + let result = writer.checkpoint_data(&engine)?; + + // Verify output schema includes stats_parsed in add action + let add_field = result + .output_schema() + .field("add") + .expect("output schema should have 'add' field"); + if let KernelDataType::Struct(add_struct) = add_field.data_type() { + assert!( + add_struct.field("stats_parsed").is_some(), + "Add action should have stats_parsed field in output schema" + ); + assert!( + add_struct.field("stats").is_some(), + "Add action should have stats field (writeStatsAsJson=true by default)" + ); + } else { + panic!("add field should be a struct"); + } + + // Consume the data iterator - transform is applied internally + let mut batch_count = 0; + for batch_result in result { + let _batch = batch_result?; + batch_count += 1; + } + assert!(batch_count > 0, "Should have at least one batch"); + + Ok(()) +} + +/// Tests checkpoint_data with default settings. +/// Defaults: writeStatsAsJson=true, writeStatsAsStruct=true +/// Verifies that the output schema includes both stats and stats_parsed. +#[tokio::test] +async fn test_checkpoint_data_default_settings() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngineBuilder::new(store.clone()).build(); + + // 1st commit: protocol + metadata with default settings + write_commit_to_store( + &store, + vec![create_basic_protocol_action(), create_metadata_action()], + 0, + ) + .await?; + + // 2nd commit: add action with stats + write_commit_to_store( + &store, + vec![create_add_action_with_stats("file1.parquet", 100)], + 1, + ) + .await?; + + let table_root = Url::parse("memory:///")?; + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let writer = snapshot.create_checkpoint_writer()?; + + // Call checkpoint_data + let result = writer.checkpoint_data(&engine)?; + + // Output schema should have both stats and stats_parsed (defaults: writeStatsAsJson=true, writeStatsAsStruct=true) + let add_field = result + .output_schema() + .field("add") + .expect("output schema should have 'add' field"); + if let KernelDataType::Struct(add_struct) = add_field.data_type() { + assert!( + add_struct.field("stats_parsed").is_some(), + "Add action should have stats_parsed (writeStatsAsStruct=true by default)" + ); + assert!( + add_struct.field("stats").is_some(), + "Add action should have stats field (writeStatsAsJson=true by default)" + ); + } else { + panic!("add field should be a struct"); + } + + Ok(()) +} + +/// Tests that checkpoint data can be iterated with stats transforms applied internally. +#[tokio::test] +async fn test_checkpoint_stats_iteration() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngineBuilder::new(store.clone()).build(); + + // 1st commit: protocol + metadata with writeStatsAsStruct=true + write_commit_to_store( + &store, + vec![ + create_basic_protocol_action(), + create_metadata_action_with_stats_struct(), + ], + 0, + ) + .await?; + + // 2nd commit: add action with JSON stats + write_commit_to_store( + &store, + vec![create_add_action_with_stats("file1.parquet", 42)], + 1, + ) + .await?; + + let table_root = Url::parse("memory:///")?; + let snapshot = Snapshot::builder_for(table_root).build(&engine)?; + let writer = snapshot.create_checkpoint_writer()?; + + let result = writer.checkpoint_data(&engine)?; + + // Verify output schema has stats_parsed + let add_field = result + .output_schema() + .field("add") + .expect("output schema should have 'add' field"); + if let KernelDataType::Struct(add_struct) = add_field.data_type() { + assert!( + add_struct.field("stats_parsed").is_some(), + "Add action should have stats_parsed when writeStatsAsStruct=true" + ); + } + + // Consume the iterator to verify no errors during reading + // The transform is applied internally + let mut batch_count = 0; + for batch_result in result { + let _batch = batch_result?; + batch_count += 1; + } + assert!(batch_count > 0, "Should have at least one batch"); + + Ok(()) +} + +/// Helper to create metadata action with specific stats settings +fn create_metadata_with_stats_config( + write_stats_as_json: bool, + write_stats_as_struct: bool, +) -> Action { + let mut config = HashMap::new(); + config.insert( + "delta.checkpoint.writeStatsAsJson".to_string(), + write_stats_as_json.to_string(), + ); + config.insert( + "delta.checkpoint.writeStatsAsStruct".to_string(), + write_stats_as_struct.to_string(), + ); + Action::Metadata( + Metadata::try_new( + Some("test-table".into()), + None, + StructType::new_unchecked([ + StructField::nullable("id", KernelDataType::LONG), + StructField::nullable("name", KernelDataType::STRING), + ]), + vec![], + 0, + config, + ) + .unwrap(), + ) +} + +/// Verifies checkpoint schema has expected fields based on stats configuration. +fn verify_checkpoint_schema( + output_schema: &crate::schema::SchemaRef, + expect_stats: bool, + expect_stats_parsed: bool, +) -> DeltaResult<()> { + let add_field = output_schema + .field("add") + .expect("output schema should have 'add' field"); + + if let KernelDataType::Struct(add_struct) = add_field.data_type() { + let has_stats = add_struct.field("stats").is_some(); + let has_stats_parsed = add_struct.field("stats_parsed").is_some(); + + assert_eq!( + has_stats, expect_stats, + "stats field: expected={}, actual={}", + expect_stats, has_stats + ); + assert_eq!( + has_stats_parsed, expect_stats_parsed, + "stats_parsed field: expected={}, actual={}", + expect_stats_parsed, has_stats_parsed + ); + } else { + panic!("add field should be a struct"); + } + Ok(()) +} + +/// Tests all 16 combinations of writeStatsAsJson and writeStatsAsStruct settings. +/// +/// This test verifies: +/// 1. Checkpoint 1 schema matches the initial settings +/// 2. Checkpoint 2 schema matches the updated settings +/// 3. Stats can be recovered if they were preserved in checkpoint 1 +#[tokio::test] +async fn test_all_stats_config_combinations() -> DeltaResult<()> { + let test_cases: Vec<(bool, bool, bool, bool)> = vec![ + // (json1, struct1, json2, struct2) + (true, true, true, true), + (true, true, true, false), + (true, true, false, true), + (true, true, false, false), + (true, false, true, true), + (true, false, true, false), + (true, false, false, true), + (true, false, false, false), + (false, true, true, true), + (false, true, true, false), + (false, true, false, true), + (false, true, false, false), + (false, false, true, true), + (false, false, true, false), + (false, false, false, true), + (false, false, false, false), + ]; + + for (i, (json1, struct1, json2, struct2)) in test_cases.iter().enumerate() { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngineBuilder::new(store.clone()).build(); + let table_root = Url::parse("memory:///")?; + + // Commit 0: protocol + metadata with initial settings + write_commit_to_store( + &store, + vec![ + create_basic_protocol_action(), + create_metadata_with_stats_config(*json1, *struct1), + ], + 0, + ) + .await?; + + // Commit 1: add action with stats + write_commit_to_store( + &store, + vec![create_add_action_with_stats("file1.parquet", 100)], + 1, + ) + .await?; + + // Create checkpoint 1 + let snapshot1 = Snapshot::builder_for(table_root.clone()).build(&engine)?; + let writer1 = snapshot1.create_checkpoint_writer()?; + let result1 = writer1.checkpoint_data(&engine)?; + + // Verify checkpoint 1 schema + verify_checkpoint_schema(result1.output_schema(), *json1, *struct1)?; + + // Consume checkpoint 1 data + for batch in result1 { + let _ = batch?; + } + + // Commit 2: update metadata with new settings + write_commit_to_store( + &store, + vec![create_metadata_with_stats_config(*json2, *struct2)], + 2, + ) + .await?; + + // Commit 3: add another file + write_commit_to_store( + &store, + vec![create_add_action_with_stats("file2.parquet", 200)], + 3, + ) + .await?; + + // Create checkpoint 2 + let snapshot2 = Snapshot::builder_for(table_root).build(&engine)?; + let writer2 = snapshot2.create_checkpoint_writer()?; + let result2 = writer2.checkpoint_data(&engine)?; + + // Verify checkpoint 2 schema + verify_checkpoint_schema(result2.output_schema(), *json2, *struct2)?; + + // Consume checkpoint 2 data (verifies transform doesn't error) + for batch in result2 { + let _ = batch?; + } + + println!( + "Case {}: json1={}, struct1={}, json2={}, struct2={} - PASS", + i + 1, + json1, + struct1, + json2, + struct2 + ); + } + + Ok(()) +} diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 195b64f31d..1828c65b33 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -10,7 +10,7 @@ use crate::actions::domain_metadata::{ }; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::INTERNAL_DOMAIN_PREFIX; -use crate::checkpoint::CheckpointWriter; +use crate::checkpoint::{CheckpointDataIterator, CheckpointWriter}; use crate::committer::Committer; use crate::listed_log_files::{ListedLogFiles, ListedLogFilesBuilder}; use crate::log_segment::LogSegment;