Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kernel/examples/checkpoint-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ use futures::future::{BoxFuture, FutureExt};
use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
use parquet::arrow::AsyncArrowWriter;

use delta_kernel::checkpoint::CheckpointDataIterator;
use delta_kernel::engine::arrow_data::EngineDataArrowExt;
use delta_kernel::engine::default::DefaultEngineBuilder;
use delta_kernel::{ActionReconciliationIterator, DeltaResult, Error, FileMeta, Snapshot};
use delta_kernel::{DeltaResult, Error, FileMeta, Snapshot};

/// An example program that checkpoints a table.
/// !!!WARNING!!!: This doesn't use put-if-absent, or a catalog based commit, so it is UNSAFE.
Expand Down Expand Up @@ -44,7 +45,7 @@ async fn main() -> ExitCode {

async fn write_data<W: AsyncFileWriter>(
first_batch: &RecordBatch,
batch_iter: &mut ActionReconciliationIterator,
batch_iter: &mut impl CheckpointDataIterator,
parquet_writer: &mut AsyncArrowWriter<W>,
) -> DeltaResult<()> {
parquet_writer.write(first_batch).await?;
Expand Down
267 changes: 222 additions & 45 deletions kernel/src/checkpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! ## Architecture
//!
//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written
//! - [`CheckpointDataIterator`] - Trait for iterators over checkpoint data to be written
//!
//! ## Usage
//!
Expand All @@ -33,14 +33,15 @@
//! # use std::sync::Arc;
//! # use delta_kernel::ActionReconciliationIterator;
//! # use delta_kernel::checkpoint::CheckpointWriter;
//! # use delta_kernel::checkpoint::CheckpointDataIterator;
//! # use delta_kernel::Engine;
//! # use delta_kernel::Snapshot;
//! # use delta_kernel::SnapshotRef;
//! # use delta_kernel::DeltaResult;
//! # use delta_kernel::Error;
//! # use delta_kernel::FileMeta;
//! # use url::Url;
//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult<FileMeta> {
//! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator) -> DeltaResult<FileMeta> {
//! todo!() /* engine-specific logic to write data to object storage*/
//! }
//!
Expand Down Expand Up @@ -112,6 +113,13 @@ use crate::{DeltaResult, Engine, EngineData, Error, EvaluationHandlerExtension,

use url::Url;

mod stats_transform;

use stats_transform::{
build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform,
StatsTransformConfig,
};

#[cfg(test)]
mod tests;

Expand All @@ -129,8 +137,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
.into()
});

/// Schema for extracting relevant actions from log files for checkpoint creation
static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
/// Schema for V1 checkpoints (without checkpointMetadata action)
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
Expand All @@ -141,16 +149,118 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
]))
});

// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints
// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
// we're not supporting yet due to the lack of map support TODO(#880).
static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([StructField::nullable(
/// Schema for the checkpointMetadata field in V2 checkpoints.
/// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
/// we're not supporting yet due to the lack of map support TODO(#880).
fn checkpoint_metadata_field() -> StructField {
StructField::nullable(
CHECKPOINT_METADATA_NAME,
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
)]))
)
}

/// Schema for V2 checkpoints (includes checkpointMetadata action)
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable(ADD_NAME, Add::to_schema()),
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
checkpoint_metadata_field(),
]))
});

/// Trait for iterators that yield checkpoint data batches.
///
/// This trait abstracts over checkpoint data iterators, allowing the concrete implementation
/// to change without breaking the public API. Implementations yield [`FilteredEngineData`]
/// batches that should be written to the checkpoint file.
///
/// # Yielded Data
///
/// All batches conform to [`output_schema()`][Self::output_schema], which is determined by:
/// - **V1 checkpoints**: Schema includes add, remove, metadata, protocol, txn, sidecar
/// - **V2 checkpoints**: Same as V1 plus checkpointMetadata
///
/// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table
/// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`).
///
/// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other
/// action fields set to null.
pub trait CheckpointDataIterator: Iterator<Item = DeltaResult<FilteredEngineData>> {
/// Returns the schema for writing checkpoint data.
///
/// All batches from this iterator conform to this schema. The schema reflects:
/// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field)
/// - Stats configuration (`stats` and/or `stats_parsed` fields)
fn output_schema(&self) -> &SchemaRef;

/// Returns the shared iterator state for tracking counts and exhaustion.
///
/// This state should be passed to [`CheckpointWriter::finalize`] after the iterator
/// has been fully consumed.
fn state(&self) -> Arc<ActionReconciliationIteratorState>;
}

/// Iterator that applies stats transforms to checkpoint data batches.
///
/// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an
/// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch
/// to populate stats fields.
///
/// All batches (including the checkpoint metadata batch for V2 checkpoints) share the
/// same schema and go through the same transform pipeline. The stats transform only
/// operates on the `add` field, so other fields (including `checkpointMetadata`) pass
/// through unchanged.
pub struct TransformingCheckpointIterator {
inner: ActionReconciliationIterator,
evaluator: Arc<dyn crate::ExpressionEvaluator>,
/// Schema for writing checkpoint data (includes/excludes stats fields based on config)
output_schema: SchemaRef,
}

impl TransformingCheckpointIterator {
/// Creates a new transforming iterator.
pub(crate) fn new(
inner: ActionReconciliationIterator,
evaluator: Arc<dyn crate::ExpressionEvaluator>,
output_schema: SchemaRef,
) -> Self {
Self {
inner,
evaluator,
output_schema,
}
}
}

impl CheckpointDataIterator for TransformingCheckpointIterator {
fn output_schema(&self) -> &SchemaRef {
&self.output_schema
}

fn state(&self) -> Arc<ActionReconciliationIteratorState> {
self.inner.state()
}
}

impl Iterator for TransformingCheckpointIterator {
type Item = DeltaResult<FilteredEngineData>;

fn next(&mut self) -> Option<Self::Item> {
let batch = self.inner.next()?;

// Apply the transform to the batch
Some(batch.and_then(|filtered_data| {
let (engine_data, selection_vector) = filtered_data.into_parts();
let transformed = self.evaluator.evaluate(engine_data.as_ref())?;
FilteredEngineData::try_new(transformed, selection_vector)
}))
}
}

/// Orchestrates the process of creating a checkpoint for a table.
///
/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
Expand Down Expand Up @@ -216,50 +326,91 @@ impl CheckpointWriter {
}
/// Returns the checkpoint data to be written to the checkpoint file.
///
/// This method reads the actions from the log segment and processes them
/// to create the checkpoint data.
/// This method reads actions from the log segment, processes them for checkpoint creation,
/// and applies stats transforms based on table properties:
/// - `delta.checkpoint.writeStatsAsJson` (default: true)
/// - `delta.checkpoint.writeStatsAsStruct` (default: false)
///
/// # Parameters
/// - `engine`: Implementation of [`Engine`] APIs.
/// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats
/// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the
/// schema for writing the checkpoint file.
///
/// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
// This method is the core of the checkpoint generation process. It:
// 1. Determines whether to write a V1 or V2 checkpoint based on the table's
// `v2Checkpoints` feature support
// 2. Reads actions from the log segment using the checkpoint read schema
// 3. Filters and deduplicates actions for the checkpoint
// 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
// (i.e., if `v2Checkpoints` feature is supported by table)
// 5. Generates the appropriate checkpoint path
pub fn checkpoint_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<ActionReconciliationIterator> {
/// # Engine Usage
///
/// ```ignore
/// let mut checkpoint_data = writer.checkpoint_data(&engine)?;
/// let output_schema = checkpoint_data.output_schema().clone();
/// while let Some(batch) = checkpoint_data.next() {
/// let data = batch?.apply_selection_vector()?;
/// parquet_writer.write(&data, &output_schema).await?;
/// }
/// writer.finalize(&engine, &metadata, checkpoint_data)?;
/// ```
pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult<impl CheckpointDataIterator> {
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());

// Get stats schema from table configuration.
// This already excludes partition columns and applies column mapping.
let stats_schema = self
.snapshot
.table_configuration()
.expected_stats_schema()?;

// Select schema based on V2 checkpoint support
let is_v2_checkpoints_supported = self
.snapshot
.table_configuration()
.is_feature_supported(&TableFeature::V2Checkpoint);

let actions = self.snapshot.log_segment().read_actions(
engine,
CHECKPOINT_ACTIONS_SCHEMA.clone(),
None,
)?;
let base_schema = if is_v2_checkpoints_supported {
&CHECKPOINT_ACTIONS_SCHEMA_V2
} else {
&CHECKPOINT_ACTIONS_SCHEMA_V1
};

// Read schema includes stats_parsed so COALESCE expressions can operate on it.
// For commits, stats_parsed will be read as nulls (column doesn't exist in source).
let read_schema = build_checkpoint_read_schema_with_stats(base_schema, &stats_schema)?;

// Create iterator over actions for checkpoint data
// Read actions from log segment
let actions =
self.snapshot
.log_segment()
.read_actions(engine, read_schema.clone(), None)?;

// Process actions through reconciliation
let checkpoint_data = ActionReconciliationProcessor::new(
self.deleted_file_retention_timestamp()?,
self.get_transaction_expiration_timestamp()?,
)
.process_actions_iter(actions);

let checkpoint_metadata =
is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine));
// Build output schema based on stats config (determines which fields are included)
let output_schema = build_checkpoint_output_schema(&config, base_schema, &stats_schema)?;

// Build transform expression and create expression evaluator
let transform_expr = build_stats_transform(&config, stats_schema);
let evaluator = engine.evaluation_handler().new_expression_evaluator(
read_schema.clone(),
transform_expr,
output_schema.clone().into(),
)?;

// For V2 checkpoints, chain the checkpoint metadata batch to the action stream.
// The checkpoint metadata batch uses the read schema (with stats_parsed), so it can
// go through the same stats transform pipeline.
let checkpoint_metadata = is_v2_checkpoints_supported
.then(|| self.create_checkpoint_metadata_batch(engine, &read_schema));

// Wrap the iterator to track action counts
Ok(ActionReconciliationIterator::new(Box::new(
checkpoint_data.chain(checkpoint_metadata),
)))
// Create action reconciliation iterator, chaining checkpoint metadata for V2
let inner =
ActionReconciliationIterator::new(Box::new(checkpoint_data.chain(checkpoint_metadata)));

Ok(TransformingCheckpointIterator::new(
inner,
evaluator,
output_schema,
))
}

/// Finalizes checkpoint creation by saving metadata about the checkpoint.
Expand Down Expand Up @@ -328,23 +479,49 @@ impl CheckpointWriter {
///
/// # Implementation Details
///
/// The function creates a single-row [`EngineData`] batch containing only the
/// version field of the [`CheckpointMetadata`] action. Future implementations will
/// include the additional metadata field `tags` when map support is added.
/// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint
/// schema, with all action fields (add, remove, etc.) set to null except for the
/// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same
/// schema as other action batches, allowing them to be written to the same Parquet file.
///
/// # Returns:
/// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with
/// an accompanying selection vector with a single `true` value, indicating the action in
/// batch should be included in the checkpoint.
/// Creates the checkpoint metadata batch with the given schema.
///
/// The schema must be the read schema (with stats_parsed) so the batch can go through
/// the same stats transform pipeline as regular action batches.
fn create_checkpoint_metadata_batch(
&self,
engine: &dyn Engine,
schema: &SchemaRef,
) -> DeltaResult<ActionReconciliationBatch> {
let checkpoint_metadata_batch = engine.evaluation_handler().create_one(
CHECKPOINT_METADATA_ACTION_SCHEMA.clone(),
&[Scalar::from(self.version)],
use crate::expressions::{Expression, StructData, Transform};

// Start with an all-null row
let null_row = engine.evaluation_handler().null_row(schema.clone())?;

// Build the checkpointMetadata struct value
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
vec![StructField::not_null("version", DataType::LONG)],
vec![Scalar::from(self.version)],
)?);

// Use a Transform to set just the checkpointMetadata field, keeping others null
let transform = Transform::new_top_level().with_replaced_field(
CHECKPOINT_METADATA_NAME,
Arc::new(Expression::literal(checkpoint_metadata_value)),
);

let evaluator = engine.evaluation_handler().new_expression_evaluator(
schema.clone(),
Arc::new(Expression::transform(transform)),
schema.clone().into(),
)?;

let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;

let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);

Ok(ActionReconciliationBatch {
Expand Down
Loading
Loading