Skip to content

Commit 9df9500

Browse files
committed
write-integration
1 parent eab701e commit 9df9500

File tree

4 files changed

+701
-88
lines changed

4 files changed

+701
-88
lines changed

kernel/examples/checkpoint-table/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use futures::future::{BoxFuture, FutureExt};
77
use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
88
use parquet::arrow::AsyncArrowWriter;
99

10+
use delta_kernel::checkpoint::CheckpointDataIterator;
1011
use delta_kernel::engine::arrow_data::EngineDataArrowExt;
1112
use delta_kernel::engine::default::DefaultEngineBuilder;
12-
use delta_kernel::{ActionReconciliationIterator, DeltaResult, Error, FileMeta, Snapshot};
13+
use delta_kernel::{DeltaResult, Error, FileMeta, Snapshot};
1314

1415
/// An example program that checkpoints a table.
1516
/// !!!WARNING!!!: This doesn't use put-if-absent, or a catalog based commit, so it is UNSAFE.
@@ -44,7 +45,7 @@ async fn main() -> ExitCode {
4445

4546
async fn write_data<W: AsyncFileWriter>(
4647
first_batch: &RecordBatch,
47-
batch_iter: &mut ActionReconciliationIterator,
48+
batch_iter: &mut impl CheckpointDataIterator,
4849
parquet_writer: &mut AsyncArrowWriter<W>,
4950
) -> DeltaResult<()> {
5051
parquet_writer.write(first_batch).await?;

kernel/src/checkpoint/mod.rs

Lines changed: 222 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
//! ## Architecture
1717
//!
1818
//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
19-
//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written
19+
//! - [`CheckpointDataIterator`] - Trait for iterators over checkpoint data to be written
2020
//!
2121
//! ## Usage
2222
//!
@@ -33,14 +33,15 @@
3333
//! # use std::sync::Arc;
3434
//! # use delta_kernel::ActionReconciliationIterator;
3535
//! # use delta_kernel::checkpoint::CheckpointWriter;
36+
//! # use delta_kernel::checkpoint::CheckpointDataIterator;
3637
//! # use delta_kernel::Engine;
3738
//! # use delta_kernel::Snapshot;
3839
//! # use delta_kernel::SnapshotRef;
3940
//! # use delta_kernel::DeltaResult;
4041
//! # use delta_kernel::Error;
4142
//! # use delta_kernel::FileMeta;
4243
//! # use url::Url;
43-
//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult<FileMeta> {
44+
//! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator) -> DeltaResult<FileMeta> {
4445
//! todo!() /* engine-specific logic to write data to object storage*/
4546
//! }
4647
//!
@@ -114,6 +115,11 @@ use url::Url;
114115

115116
mod stats_transform;
116117

118+
use stats_transform::{
119+
build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform,
120+
StatsTransformConfig,
121+
};
122+
117123
#[cfg(test)]
118124
mod tests;
119125

@@ -131,8 +137,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
131137
.into()
132138
});
133139

134-
/// Schema for extracting relevant actions from log files for checkpoint creation
135-
static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
140+
/// Schema for V1 checkpoints (without checkpointMetadata action)
141+
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> = LazyLock::new(|| {
136142
Arc::new(StructType::new_unchecked([
137143
StructField::nullable(ADD_NAME, Add::to_schema()),
138144
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
@@ -143,16 +149,120 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
143149
]))
144150
});
145151

146-
// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints
147-
// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
148-
// we're not supporting yet due to the lack of map support TODO(#880).
149-
static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
150-
Arc::new(StructType::new_unchecked([StructField::nullable(
152+
/// Schema for the checkpointMetadata field in V2 checkpoints.
153+
/// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
154+
/// we're not supporting yet due to the lack of map support TODO(#880).
155+
fn checkpoint_metadata_field() -> StructField {
156+
StructField::nullable(
151157
CHECKPOINT_METADATA_NAME,
152158
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
153-
)]))
159+
)
160+
}
161+
162+
/// Schema for V2 checkpoints (includes checkpointMetadata action)
163+
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
164+
Arc::new(StructType::new_unchecked([
165+
StructField::nullable(ADD_NAME, Add::to_schema()),
166+
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
167+
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
168+
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
169+
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
170+
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
171+
checkpoint_metadata_field(),
172+
]))
154173
});
155174

175+
/// Trait for iterators that yield checkpoint data batches.
176+
///
177+
/// This trait abstracts over checkpoint data iterators, allowing the concrete implementation
178+
/// to change without breaking the public API. Implementations yield [`FilteredEngineData`]
179+
/// batches that should be written to the checkpoint file.
180+
///
181+
/// # Yielded Data
182+
///
183+
/// All batches conform to [`output_schema()`][Self::output_schema], which is determined by:
184+
/// - **V1 checkpoints**: Schema based on [`CHECKPOINT_ACTIONS_SCHEMA_V1`] (add, remove, metadata,
185+
/// protocol, txn, sidecar)
186+
/// - **V2 checkpoints**: Schema based on [`CHECKPOINT_ACTIONS_SCHEMA_V2`] (same as V1 plus
187+
/// checkpointMetadata)
188+
///
189+
/// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table
190+
/// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`).
191+
///
192+
/// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other
193+
/// action fields set to null.
194+
pub trait CheckpointDataIterator: Iterator<Item = DeltaResult<FilteredEngineData>> {
195+
/// Returns the schema for writing checkpoint data.
196+
///
197+
/// All batches from this iterator conform to this schema. The schema reflects:
198+
/// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field)
199+
/// - Stats configuration (`stats` and/or `stats_parsed` fields)
200+
fn output_schema(&self) -> &SchemaRef;
201+
202+
/// Returns the shared iterator state for tracking counts and exhaustion.
203+
///
204+
/// This state should be passed to [`CheckpointWriter::finalize`] after the iterator
205+
/// has been fully consumed.
206+
fn state(&self) -> Arc<ActionReconciliationIteratorState>;
207+
}
208+
209+
/// Iterator that applies stats transforms to checkpoint data batches.
210+
///
211+
/// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an
212+
/// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch
213+
/// to populate stats fields.
214+
///
215+
/// All batches (including the checkpoint metadata batch for V2 checkpoints) share the
216+
/// same schema and go through the same transform pipeline. The stats transform only
217+
/// operates on the `add` field, so other fields (including `checkpointMetadata`) pass
218+
/// through unchanged.
219+
pub struct TransformingCheckpointIterator {
220+
inner: ActionReconciliationIterator,
221+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
222+
/// Schema for writing checkpoint data (includes/excludes stats fields based on config)
223+
output_schema: SchemaRef,
224+
}
225+
226+
impl TransformingCheckpointIterator {
227+
/// Creates a new transforming iterator.
228+
pub(crate) fn new(
229+
inner: ActionReconciliationIterator,
230+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
231+
output_schema: SchemaRef,
232+
) -> Self {
233+
Self {
234+
inner,
235+
evaluator,
236+
output_schema,
237+
}
238+
}
239+
}
240+
241+
impl CheckpointDataIterator for TransformingCheckpointIterator {
242+
fn output_schema(&self) -> &SchemaRef {
243+
&self.output_schema
244+
}
245+
246+
fn state(&self) -> Arc<ActionReconciliationIteratorState> {
247+
self.inner.state()
248+
}
249+
}
250+
251+
impl Iterator for TransformingCheckpointIterator {
252+
type Item = DeltaResult<FilteredEngineData>;
253+
254+
fn next(&mut self) -> Option<Self::Item> {
255+
let batch = self.inner.next()?;
256+
257+
// Apply the transform to the batch
258+
Some(batch.and_then(|filtered_data| {
259+
let (engine_data, selection_vector) = filtered_data.into_parts();
260+
let transformed = self.evaluator.evaluate(engine_data.as_ref())?;
261+
FilteredEngineData::try_new(transformed, selection_vector)
262+
}))
263+
}
264+
}
265+
156266
/// Orchestrates the process of creating a checkpoint for a table.
157267
///
158268
/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
@@ -218,50 +328,91 @@ impl CheckpointWriter {
218328
}
219329
/// Returns the checkpoint data to be written to the checkpoint file.
220330
///
221-
/// This method reads the actions from the log segment and processes them
222-
/// to create the checkpoint data.
331+
/// This method reads actions from the log segment, processes them for checkpoint creation,
332+
/// and applies stats transforms based on table properties:
333+
/// - `delta.checkpoint.writeStatsAsJson` (default: true)
334+
/// - `delta.checkpoint.writeStatsAsStruct` (default: false)
223335
///
224-
/// # Parameters
225-
/// - `engine`: Implementation of [`Engine`] APIs.
336+
/// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats
337+
/// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the
338+
/// schema for writing the checkpoint file.
226339
///
227-
/// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
228-
// This method is the core of the checkpoint generation process. It:
229-
// 1. Determines whether to write a V1 or V2 checkpoint based on the table's
230-
// `v2Checkpoints` feature support
231-
// 2. Reads actions from the log segment using the checkpoint read schema
232-
// 3. Filters and deduplicates actions for the checkpoint
233-
// 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
234-
// (i.e., if `v2Checkpoints` feature is supported by table)
235-
// 5. Generates the appropriate checkpoint path
236-
pub fn checkpoint_data(
237-
&self,
238-
engine: &dyn Engine,
239-
) -> DeltaResult<ActionReconciliationIterator> {
340+
/// # Engine Usage
341+
///
342+
/// ```ignore
343+
/// let mut checkpoint_data = writer.checkpoint_data(&engine)?;
344+
/// let output_schema = checkpoint_data.output_schema().clone();
345+
/// while let Some(batch) = checkpoint_data.next() {
346+
/// let data = batch?.apply_selection_vector()?;
347+
/// parquet_writer.write(&data, &output_schema).await?;
348+
/// }
349+
/// writer.finalize(&engine, &metadata, checkpoint_data)?;
350+
/// ```
351+
pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult<impl CheckpointDataIterator> {
352+
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());
353+
354+
// Get stats schema from table configuration.
355+
// This already excludes partition columns and applies column mapping.
356+
let stats_schema = self
357+
.snapshot
358+
.table_configuration()
359+
.expected_stats_schema()?;
360+
361+
// Select schema based on V2 checkpoint support
240362
let is_v2_checkpoints_supported = self
241363
.snapshot
242364
.table_configuration()
243365
.is_feature_supported(&TableFeature::V2Checkpoint);
244366

245-
let actions = self.snapshot.log_segment().read_actions(
246-
engine,
247-
CHECKPOINT_ACTIONS_SCHEMA.clone(),
248-
None,
249-
)?;
367+
let base_schema = if is_v2_checkpoints_supported {
368+
&CHECKPOINT_ACTIONS_SCHEMA_V2
369+
} else {
370+
&CHECKPOINT_ACTIONS_SCHEMA_V1
371+
};
372+
373+
// Read schema includes stats_parsed so COALESCE expressions can operate on it.
374+
// For commits, stats_parsed will be read as nulls (column doesn't exist in source).
375+
let read_schema = build_checkpoint_read_schema_with_stats(base_schema, &stats_schema)?;
250376

251-
// Create iterator over actions for checkpoint data
377+
// Read actions from log segment
378+
let actions =
379+
self.snapshot
380+
.log_segment()
381+
.read_actions(engine, read_schema.clone(), None)?;
382+
383+
// Process actions through reconciliation
252384
let checkpoint_data = ActionReconciliationProcessor::new(
253385
self.deleted_file_retention_timestamp()?,
254386
self.get_transaction_expiration_timestamp()?,
255387
)
256388
.process_actions_iter(actions);
257389

258-
let checkpoint_metadata =
259-
is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine));
390+
// Build output schema based on stats config (determines which fields are included)
391+
let output_schema = build_checkpoint_output_schema(&config, base_schema, &stats_schema)?;
392+
393+
// Build transform expression and create expression evaluator
394+
let transform_expr = build_stats_transform(&config, stats_schema);
395+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
396+
read_schema.clone(),
397+
transform_expr,
398+
output_schema.clone().into(),
399+
)?;
400+
401+
// For V2 checkpoints, chain the checkpoint metadata batch to the action stream.
402+
// The checkpoint metadata batch uses the read schema (with stats_parsed), so it can
403+
// go through the same stats transform pipeline.
404+
let checkpoint_metadata = is_v2_checkpoints_supported
405+
.then(|| self.create_checkpoint_metadata_batch(engine, &read_schema));
260406

261-
// Wrap the iterator to track action counts
262-
Ok(ActionReconciliationIterator::new(Box::new(
263-
checkpoint_data.chain(checkpoint_metadata),
264-
)))
407+
// Create action reconciliation iterator, chaining checkpoint metadata for V2
408+
let inner =
409+
ActionReconciliationIterator::new(Box::new(checkpoint_data.chain(checkpoint_metadata)));
410+
411+
Ok(TransformingCheckpointIterator::new(
412+
inner,
413+
evaluator,
414+
output_schema,
415+
))
265416
}
266417

267418
/// Finalizes checkpoint creation by saving metadata about the checkpoint.
@@ -330,23 +481,49 @@ impl CheckpointWriter {
330481
///
331482
/// # Implementation Details
332483
///
333-
/// The function creates a single-row [`EngineData`] batch containing only the
334-
/// version field of the [`CheckpointMetadata`] action. Future implementations will
335-
/// include the additional metadata field `tags` when map support is added.
484+
/// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint
485+
/// schema, with all action fields (add, remove, etc.) set to null except for the
486+
/// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same
487+
/// schema as other action batches, allowing them to be written to the same Parquet file.
336488
///
337489
/// # Returns:
338490
/// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with
339491
/// an accompanying selection vector with a single `true` value, indicating the action in
340492
/// batch should be included in the checkpoint.
493+
/// Creates the checkpoint metadata batch with the given schema.
494+
///
495+
/// The schema must be the read schema (with stats_parsed) so the batch can go through
496+
/// the same stats transform pipeline as regular action batches.
341497
fn create_checkpoint_metadata_batch(
342498
&self,
343499
engine: &dyn Engine,
500+
schema: &SchemaRef,
344501
) -> DeltaResult<ActionReconciliationBatch> {
345-
let checkpoint_metadata_batch = engine.evaluation_handler().create_one(
346-
CHECKPOINT_METADATA_ACTION_SCHEMA.clone(),
347-
&[Scalar::from(self.version)],
502+
use crate::expressions::{Expression, StructData, Transform};
503+
504+
// Start with an all-null row
505+
let null_row = engine.evaluation_handler().null_row(schema.clone())?;
506+
507+
// Build the checkpointMetadata struct value
508+
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
509+
vec![StructField::not_null("version", DataType::LONG)],
510+
vec![Scalar::from(self.version)],
511+
)?);
512+
513+
// Use a Transform to set just the checkpointMetadata field, keeping others null
514+
let transform = Transform::new_top_level().with_replaced_field(
515+
CHECKPOINT_METADATA_NAME,
516+
Arc::new(Expression::literal(checkpoint_metadata_value)),
517+
);
518+
519+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
520+
schema.clone(),
521+
Arc::new(Expression::transform(transform)),
522+
schema.clone().into(),
348523
)?;
349524

525+
let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;
526+
350527
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);
351528

352529
Ok(ActionReconciliationBatch {

kernel/src/checkpoint/stats_transform.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#![allow(dead_code)] // TODO: Remove when integrated in checkpoint_data()
21
//! Transforms for populating stats_parsed and stats fields in checkpoint data.
32
//!
43
//! When writing checkpoints, statistics can be stored in two formats:

0 commit comments

Comments
 (0)