Skip to content

Commit 03a8e93

Browse files
committed
write-integration
1 parent 4b77bb4 commit 03a8e93

File tree

5 files changed

+703
-82
lines changed

5 files changed

+703
-82
lines changed

kernel/examples/checkpoint-table/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use futures::future::{BoxFuture, FutureExt};
77
use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
88
use parquet::arrow::AsyncArrowWriter;
99

10+
use delta_kernel::checkpoint::CheckpointDataIterator;
1011
use delta_kernel::engine::arrow_data::EngineDataArrowExt;
1112
use delta_kernel::engine::default::DefaultEngineBuilder;
12-
use delta_kernel::{ActionReconciliationIterator, DeltaResult, Error, FileMeta, Snapshot};
13+
use delta_kernel::{DeltaResult, Error, FileMeta, Snapshot};
1314

1415
/// An example program that checkpoints a table.
1516
/// !!!WARNING!!!: This doesn't use put-if-absent, or a catalog based commit, so it is UNSAFE.
@@ -44,7 +45,7 @@ async fn main() -> ExitCode {
4445

4546
async fn write_data<W: AsyncFileWriter>(
4647
first_batch: &RecordBatch,
47-
batch_iter: &mut ActionReconciliationIterator,
48+
batch_iter: &mut impl CheckpointDataIterator,
4849
parquet_writer: &mut AsyncArrowWriter<W>,
4950
) -> DeltaResult<()> {
5051
parquet_writer.write(first_batch).await?;

kernel/src/checkpoint/mod.rs

Lines changed: 220 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
//! ## Architecture
1717
//!
1818
//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
19-
//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written
19+
//! - [`CheckpointDataIterator`] - Trait for iterators over checkpoint data to be written
2020
//!
2121
//! ## Usage
2222
//!
@@ -33,14 +33,15 @@
3333
//! # use std::sync::Arc;
3434
//! # use delta_kernel::ActionReconciliationIterator;
3535
//! # use delta_kernel::checkpoint::CheckpointWriter;
36+
//! # use delta_kernel::checkpoint::CheckpointDataIterator;
3637
//! # use delta_kernel::Engine;
3738
//! # use delta_kernel::Snapshot;
3839
//! # use delta_kernel::SnapshotRef;
3940
//! # use delta_kernel::DeltaResult;
4041
//! # use delta_kernel::Error;
4142
//! # use delta_kernel::FileMeta;
4243
//! # use url::Url;
43-
//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult<FileMeta> {
44+
//! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator) -> DeltaResult<FileMeta> {
4445
//! todo!() /* engine-specific logic to write data to object storage*/
4546
//! }
4647
//!
@@ -114,6 +115,11 @@ use url::Url;
114115

115116
mod stats_transform;
116117

118+
use stats_transform::{
119+
build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform,
120+
StatsTransformConfig,
121+
};
122+
117123
#[cfg(test)]
118124
mod tests;
119125

@@ -131,8 +137,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
131137
.into()
132138
});
133139

134-
/// Schema for extracting relevant actions from log files for checkpoint creation
135-
static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
140+
/// Schema for V1 checkpoints (without checkpointMetadata action)
141+
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> = LazyLock::new(|| {
136142
Arc::new(StructType::new_unchecked([
137143
StructField::nullable(ADD_NAME, Add::to_schema()),
138144
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
@@ -143,16 +149,118 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
143149
]))
144150
});
145151

146-
// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints
147-
// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
148-
// we're not supporting yet due to the lack of map support TODO(#880).
149-
static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
150-
Arc::new(StructType::new_unchecked([StructField::nullable(
152+
/// Schema for the checkpointMetadata field in V2 checkpoints.
153+
/// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
154+
/// we're not supporting yet due to the lack of map support TODO(#880).
155+
fn checkpoint_metadata_field() -> StructField {
156+
StructField::nullable(
151157
CHECKPOINT_METADATA_NAME,
152158
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
153-
)]))
159+
)
160+
}
161+
162+
/// Schema for V2 checkpoints (includes checkpointMetadata action)
163+
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
164+
Arc::new(StructType::new_unchecked([
165+
StructField::nullable(ADD_NAME, Add::to_schema()),
166+
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
167+
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
168+
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
169+
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
170+
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
171+
checkpoint_metadata_field(),
172+
]))
154173
});
155174

175+
/// Trait for iterators that yield checkpoint data batches.
176+
///
177+
/// This trait abstracts over checkpoint data iterators, allowing the concrete implementation
178+
/// to change without breaking the public API. Implementations yield [`FilteredEngineData`]
179+
/// batches that should be written to the checkpoint file.
180+
///
181+
/// # Yielded Data
182+
///
183+
/// All batches conform to [`output_schema()`][Self::output_schema], which is determined by:
184+
/// - **V1 checkpoints**: Schema includes add, remove, metadata, protocol, txn, sidecar
185+
/// - **V2 checkpoints**: Same as V1 plus checkpointMetadata
186+
///
187+
/// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table
188+
/// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`).
189+
///
190+
/// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other
191+
/// action fields set to null.
192+
pub trait CheckpointDataIterator: Iterator<Item = DeltaResult<FilteredEngineData>> {
193+
/// Returns the schema for writing checkpoint data.
194+
///
195+
/// All batches from this iterator conform to this schema. The schema reflects:
196+
/// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field)
197+
/// - Stats configuration (`stats` and/or `stats_parsed` fields)
198+
fn output_schema(&self) -> &SchemaRef;
199+
200+
/// Returns the shared iterator state for tracking counts and exhaustion.
201+
///
202+
/// This state should be passed to [`CheckpointWriter::finalize`] after the iterator
203+
/// has been fully consumed.
204+
fn state(&self) -> Arc<ActionReconciliationIteratorState>;
205+
}
206+
207+
/// Iterator that applies stats transforms to checkpoint data batches.
208+
///
209+
/// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an
210+
/// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch
211+
/// to populate stats fields.
212+
///
213+
/// All batches (including the checkpoint metadata batch for V2 checkpoints) share the
214+
/// same schema and go through the same transform pipeline. The stats transform only
215+
/// operates on the `add` field, so other fields (including `checkpointMetadata`) pass
216+
/// through unchanged.
217+
pub struct TransformingCheckpointIterator {
218+
inner: ActionReconciliationIterator,
219+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
220+
/// Schema for writing checkpoint data (includes/excludes stats fields based on config)
221+
output_schema: SchemaRef,
222+
}
223+
224+
impl TransformingCheckpointIterator {
225+
/// Creates a new transforming iterator.
226+
pub(crate) fn new(
227+
inner: ActionReconciliationIterator,
228+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
229+
output_schema: SchemaRef,
230+
) -> Self {
231+
Self {
232+
inner,
233+
evaluator,
234+
output_schema,
235+
}
236+
}
237+
}
238+
239+
impl CheckpointDataIterator for TransformingCheckpointIterator {
240+
fn output_schema(&self) -> &SchemaRef {
241+
&self.output_schema
242+
}
243+
244+
fn state(&self) -> Arc<ActionReconciliationIteratorState> {
245+
self.inner.state()
246+
}
247+
}
248+
249+
impl Iterator for TransformingCheckpointIterator {
250+
type Item = DeltaResult<FilteredEngineData>;
251+
252+
fn next(&mut self) -> Option<Self::Item> {
253+
let batch = self.inner.next()?;
254+
255+
// Apply the transform to the batch
256+
Some(batch.and_then(|filtered_data| {
257+
let (engine_data, selection_vector) = filtered_data.into_parts();
258+
let transformed = self.evaluator.evaluate(engine_data.as_ref())?;
259+
FilteredEngineData::try_new(transformed, selection_vector)
260+
}))
261+
}
262+
}
263+
156264
/// Orchestrates the process of creating a checkpoint for a table.
157265
///
158266
/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
@@ -218,50 +326,91 @@ impl CheckpointWriter {
218326
}
219327
/// Returns the checkpoint data to be written to the checkpoint file.
220328
///
221-
/// This method reads the actions from the log segment and processes them
222-
/// to create the checkpoint data.
329+
/// This method reads actions from the log segment, processes them for checkpoint creation,
330+
/// and applies stats transforms based on table properties:
331+
/// - `delta.checkpoint.writeStatsAsJson` (default: true)
332+
/// - `delta.checkpoint.writeStatsAsStruct` (default: false)
223333
///
224-
/// # Parameters
225-
/// - `engine`: Implementation of [`Engine`] APIs.
334+
/// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats
335+
/// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the
336+
/// schema for writing the checkpoint file.
226337
///
227-
/// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
228-
// This method is the core of the checkpoint generation process. It:
229-
// 1. Determines whether to write a V1 or V2 checkpoint based on the table's
230-
// `v2Checkpoints` feature support
231-
// 2. Reads actions from the log segment using the checkpoint read schema
232-
// 3. Filters and deduplicates actions for the checkpoint
233-
// 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
234-
// (i.e., if `v2Checkpoints` feature is supported by table)
235-
// 5. Generates the appropriate checkpoint path
236-
pub fn checkpoint_data(
237-
&self,
238-
engine: &dyn Engine,
239-
) -> DeltaResult<ActionReconciliationIterator> {
338+
/// # Engine Usage
339+
///
340+
/// ```ignore
341+
/// let mut checkpoint_data = writer.checkpoint_data(&engine)?;
342+
/// let output_schema = checkpoint_data.output_schema().clone();
343+
/// while let Some(batch) = checkpoint_data.next() {
344+
/// let data = batch?.apply_selection_vector()?;
345+
/// parquet_writer.write(&data, &output_schema).await?;
346+
/// }
347+
/// writer.finalize(&engine, &metadata, checkpoint_data)?;
348+
/// ```
349+
pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult<impl CheckpointDataIterator> {
350+
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());
351+
352+
// Get stats schema from table configuration.
353+
// This already excludes partition columns and applies column mapping.
354+
let stats_schema = self
355+
.snapshot
356+
.table_configuration()
357+
.expected_stats_schema()?;
358+
359+
// Select schema based on V2 checkpoint support
240360
let is_v2_checkpoints_supported = self
241361
.snapshot
242362
.table_configuration()
243363
.is_feature_supported(&TableFeature::V2Checkpoint);
244364

245-
let actions = self.snapshot.log_segment().read_actions(
246-
engine,
247-
CHECKPOINT_ACTIONS_SCHEMA.clone(),
248-
None,
249-
)?;
365+
let base_schema = if is_v2_checkpoints_supported {
366+
&CHECKPOINT_ACTIONS_SCHEMA_V2
367+
} else {
368+
&CHECKPOINT_ACTIONS_SCHEMA_V1
369+
};
370+
371+
// Read schema includes stats_parsed so COALESCE expressions can operate on it.
372+
// For commits, stats_parsed will be read as nulls (column doesn't exist in source).
373+
let read_schema = build_checkpoint_read_schema_with_stats(base_schema, &stats_schema)?;
250374

251-
// Create iterator over actions for checkpoint data
375+
// Read actions from log segment
376+
let actions =
377+
self.snapshot
378+
.log_segment()
379+
.read_actions(engine, read_schema.clone(), None)?;
380+
381+
// Process actions through reconciliation
252382
let checkpoint_data = ActionReconciliationProcessor::new(
253383
self.deleted_file_retention_timestamp()?,
254384
self.get_transaction_expiration_timestamp()?,
255385
)
256386
.process_actions_iter(actions);
257387

258-
let checkpoint_metadata =
259-
is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine));
388+
// Build output schema based on stats config (determines which fields are included)
389+
let output_schema = build_checkpoint_output_schema(&config, base_schema, &stats_schema)?;
390+
391+
// Build transform expression and create expression evaluator
392+
let transform_expr = build_stats_transform(&config, stats_schema);
393+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
394+
read_schema.clone(),
395+
transform_expr,
396+
output_schema.clone().into(),
397+
)?;
398+
399+
// For V2 checkpoints, chain the checkpoint metadata batch to the action stream.
400+
// The checkpoint metadata batch uses the read schema (with stats_parsed), so it can
401+
// go through the same stats transform pipeline.
402+
let checkpoint_metadata = is_v2_checkpoints_supported
403+
.then(|| self.create_checkpoint_metadata_batch(engine, &read_schema));
260404

261-
// Wrap the iterator to track action counts
262-
Ok(ActionReconciliationIterator::new(Box::new(
263-
checkpoint_data.chain(checkpoint_metadata),
264-
)))
405+
// Create action reconciliation iterator, chaining checkpoint metadata for V2
406+
let inner =
407+
ActionReconciliationIterator::new(Box::new(checkpoint_data.chain(checkpoint_metadata)));
408+
409+
Ok(TransformingCheckpointIterator::new(
410+
inner,
411+
evaluator,
412+
output_schema,
413+
))
265414
}
266415

267416
/// Finalizes checkpoint creation by saving metadata about the checkpoint.
@@ -330,23 +479,49 @@ impl CheckpointWriter {
330479
///
331480
/// # Implementation Details
332481
///
333-
/// The function creates a single-row [`EngineData`] batch containing only the
334-
/// version field of the [`CheckpointMetadata`] action. Future implementations will
335-
/// include the additional metadata field `tags` when map support is added.
482+
/// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint
483+
/// schema, with all action fields (add, remove, etc.) set to null except for the
484+
/// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same
485+
/// schema as other action batches, allowing them to be written to the same Parquet file.
336486
///
337487
/// # Returns:
338488
/// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with
339489
/// an accompanying selection vector with a single `true` value, indicating the action in
340490
/// batch should be included in the checkpoint.
491+
/// Creates the checkpoint metadata batch with the given schema.
492+
///
493+
/// The schema must be the read schema (with stats_parsed) so the batch can go through
494+
/// the same stats transform pipeline as regular action batches.
341495
fn create_checkpoint_metadata_batch(
342496
&self,
343497
engine: &dyn Engine,
498+
schema: &SchemaRef,
344499
) -> DeltaResult<ActionReconciliationBatch> {
345-
let checkpoint_metadata_batch = engine.evaluation_handler().create_one(
346-
CHECKPOINT_METADATA_ACTION_SCHEMA.clone(),
347-
&[Scalar::from(self.version)],
500+
use crate::expressions::{Expression, StructData, Transform};
501+
502+
// Start with an all-null row
503+
let null_row = engine.evaluation_handler().null_row(schema.clone())?;
504+
505+
// Build the checkpointMetadata struct value
506+
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
507+
vec![StructField::not_null("version", DataType::LONG)],
508+
vec![Scalar::from(self.version)],
509+
)?);
510+
511+
// Use a Transform to set just the checkpointMetadata field, keeping others null
512+
let transform = Transform::new_top_level().with_replaced_field(
513+
CHECKPOINT_METADATA_NAME,
514+
Arc::new(Expression::literal(checkpoint_metadata_value)),
515+
);
516+
517+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
518+
schema.clone(),
519+
Arc::new(Expression::transform(transform)),
520+
schema.clone().into(),
348521
)?;
349522

523+
let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;
524+
350525
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);
351526

352527
Ok(ActionReconciliationBatch {

kernel/src/checkpoint/stats_transform.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
#![allow(dead_code)] // TODO: Remove when integrated in checkpoint_data()
2-
//! Transforms for populating `stats_parsed` and `stats` fields on the `Add` action in checkpoint data.
1+
//! Transforms for populating stats_parsed and stats fields in checkpoint data.
32
//!
43
//! This module ensures that Add actions in checkpoints have the correct statistics format
54
//! based on the table configuration. Statistics can be stored in two formats as fields on

0 commit comments

Comments
 (0)