Skip to content

Commit f767ced

Browse files
committed
done
1 parent af4261b commit f767ced

File tree

4 files changed

+713
-89
lines changed

4 files changed

+713
-89
lines changed

kernel/examples/checkpoint-table/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@ use futures::future::{BoxFuture, FutureExt};
77
use parquet::arrow::async_writer::{AsyncFileWriter, ParquetObjectWriter};
88
use parquet::arrow::AsyncArrowWriter;
99

10+
use delta_kernel::checkpoint::CheckpointDataIterator;
1011
use delta_kernel::engine::arrow_data::EngineDataArrowExt;
1112
use delta_kernel::engine::default::DefaultEngineBuilder;
12-
use delta_kernel::{ActionReconciliationIterator, DeltaResult, Error, FileMeta, Snapshot};
13+
use delta_kernel::{DeltaResult, Error, FileMeta, Snapshot};
1314

1415
/// An example program that checkpoints a table.
1516
/// !!!WARNING!!!: This doesn't use put-if-absent, or a catalog based commit, so it is UNSAFE.
@@ -44,7 +45,7 @@ async fn main() -> ExitCode {
4445

4546
async fn write_data<W: AsyncFileWriter>(
4647
first_batch: &RecordBatch,
47-
batch_iter: &mut ActionReconciliationIterator,
48+
batch_iter: &mut impl CheckpointDataIterator,
4849
parquet_writer: &mut AsyncArrowWriter<W>,
4950
) -> DeltaResult<()> {
5051
parquet_writer.write(first_batch).await?;

kernel/src/checkpoint/mod.rs

Lines changed: 234 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
//! ## Architecture
1717
//!
1818
//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow
19-
//! - [`ActionReconciliationIterator`] - Iterator over the checkpoint data to be written
19+
//! - [`CheckpointDataIterator`] - Trait for iterators over checkpoint data to be written
2020
//!
2121
//! ## Usage
2222
//!
@@ -33,14 +33,15 @@
3333
//! # use std::sync::Arc;
3434
//! # use delta_kernel::ActionReconciliationIterator;
3535
//! # use delta_kernel::checkpoint::CheckpointWriter;
36+
//! # use delta_kernel::checkpoint::CheckpointDataIterator;
3637
//! # use delta_kernel::Engine;
3738
//! # use delta_kernel::Snapshot;
3839
//! # use delta_kernel::SnapshotRef;
3940
//! # use delta_kernel::DeltaResult;
4041
//! # use delta_kernel::Error;
4142
//! # use delta_kernel::FileMeta;
4243
//! # use url::Url;
43-
//! fn write_checkpoint_file(path: Url, data: &ActionReconciliationIterator) -> DeltaResult<FileMeta> {
44+
//! fn write_checkpoint_file(path: Url, data: &impl CheckpointDataIterator) -> DeltaResult<FileMeta> {
4445
//! todo!() /* engine-specific logic to write data to object storage*/
4546
//! }
4647
//!
@@ -109,6 +110,11 @@ use url::Url;
109110

110111
mod stats_transform;
111112

113+
use stats_transform::{
114+
build_checkpoint_output_schema, build_checkpoint_read_schema_with_stats, build_stats_transform,
115+
StatsTransformConfig,
116+
};
117+
112118
#[cfg(test)]
113119
mod tests;
114120

@@ -126,8 +132,8 @@ static LAST_CHECKPOINT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
126132
.into()
127133
});
128134

129-
/// Schema for extracting relevant actions from log files for checkpoint creation
130-
static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
135+
/// Schema for V1 checkpoints (without checkpointMetadata action)
136+
static CHECKPOINT_ACTIONS_SCHEMA_V1: LazyLock<SchemaRef> = LazyLock::new(|| {
131137
Arc::new(StructType::new_unchecked([
132138
StructField::nullable(ADD_NAME, Add::to_schema()),
133139
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
@@ -138,16 +144,131 @@ static CHECKPOINT_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
138144
]))
139145
});
140146

141-
// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints
142-
// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
143-
// we're not supporting yet due to the lack of map support TODO(#880).
144-
static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
145-
Arc::new(StructType::new_unchecked([StructField::nullable(
147+
/// Schema for the checkpointMetadata field in V2 checkpoints.
148+
/// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which
149+
/// we're not supporting yet due to the lack of map support TODO(#880).
150+
fn checkpoint_metadata_field() -> StructField {
151+
StructField::nullable(
146152
CHECKPOINT_METADATA_NAME,
147153
DataType::struct_type_unchecked([StructField::not_null("version", DataType::LONG)]),
148-
)]))
154+
)
155+
}
156+
157+
/// Schema for V2 checkpoints (includes checkpointMetadata action)
158+
static CHECKPOINT_ACTIONS_SCHEMA_V2: LazyLock<SchemaRef> = LazyLock::new(|| {
159+
Arc::new(StructType::new_unchecked([
160+
StructField::nullable(ADD_NAME, Add::to_schema()),
161+
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
162+
StructField::nullable(METADATA_NAME, Metadata::to_schema()),
163+
StructField::nullable(PROTOCOL_NAME, Protocol::to_schema()),
164+
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
165+
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
166+
checkpoint_metadata_field(),
167+
]))
149168
});
150169

170+
/// Trait for iterators that yield checkpoint data batches.
171+
///
172+
/// This trait abstracts over checkpoint data iterators, allowing the concrete implementation
173+
/// to change without breaking the public API. Implementations yield [`FilteredEngineData`]
174+
/// batches that should be written to the checkpoint file.
175+
///
176+
/// # Yielded Data
177+
///
178+
/// All batches conform to [`output_schema()`][Self::output_schema], which is determined by:
179+
/// - **V1 checkpoints**: Schema based on [`CHECKPOINT_ACTIONS_SCHEMA_V1`] (add, remove, metadata,
180+
/// protocol, txn, sidecar)
181+
/// - **V2 checkpoints**: Schema based on [`CHECKPOINT_ACTIONS_SCHEMA_V2`] (same as V1 plus
182+
/// checkpointMetadata)
183+
///
184+
/// The `add.stats` and `add.stats_parsed` fields are included or excluded based on table
185+
/// properties (`delta.checkpoint.writeStatsAsJson` and `delta.checkpoint.writeStatsAsStruct`).
186+
///
187+
/// For V2 checkpoints, the final batch contains the checkpoint metadata action with all other
188+
/// action fields set to null.
189+
pub trait CheckpointDataIterator: Iterator<Item = DeltaResult<FilteredEngineData>> {
190+
/// Returns the schema for writing checkpoint data.
191+
///
192+
/// All batches from this iterator conform to this schema. The schema reflects:
193+
/// - V1 vs V2 checkpoint format (V2 includes `checkpointMetadata` field)
194+
/// - Stats configuration (`stats` and/or `stats_parsed` fields)
195+
fn output_schema(&self) -> &SchemaRef;
196+
197+
/// Returns true if the iterator has been fully consumed.
198+
fn is_exhausted(&self) -> bool;
199+
200+
/// Returns the total count of actions processed.
201+
fn actions_count(&self) -> i64;
202+
203+
/// Returns the count of add actions processed.
204+
fn add_actions_count(&self) -> i64;
205+
}
206+
207+
/// Iterator that applies stats transforms to checkpoint data batches.
208+
///
209+
/// This is the concrete implementation of [`CheckpointDataIterator`] that wraps an
210+
/// [`ActionReconciliationIterator`] and applies an expression evaluator to each batch
211+
/// to populate stats fields.
212+
///
213+
/// All batches (including the checkpoint metadata batch for V2 checkpoints) share the
214+
/// same schema and go through the same transform pipeline. The stats transform only
215+
/// operates on the `add` field, so other fields (including `checkpointMetadata`) pass
216+
/// through unchanged.
217+
pub struct TransformingCheckpointIterator {
218+
inner: ActionReconciliationIterator,
219+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
220+
/// Schema for writing checkpoint data (includes/excludes stats fields based on config)
221+
output_schema: SchemaRef,
222+
}
223+
224+
impl TransformingCheckpointIterator {
225+
/// Creates a new transforming iterator.
226+
pub(crate) fn new(
227+
inner: ActionReconciliationIterator,
228+
evaluator: Arc<dyn crate::ExpressionEvaluator>,
229+
output_schema: SchemaRef,
230+
) -> Self {
231+
Self {
232+
inner,
233+
evaluator,
234+
output_schema,
235+
}
236+
}
237+
}
238+
239+
impl CheckpointDataIterator for TransformingCheckpointIterator {
240+
fn output_schema(&self) -> &SchemaRef {
241+
&self.output_schema
242+
}
243+
244+
fn is_exhausted(&self) -> bool {
245+
self.inner.is_exhausted()
246+
}
247+
248+
fn actions_count(&self) -> i64 {
249+
self.inner.actions_count()
250+
}
251+
252+
fn add_actions_count(&self) -> i64 {
253+
self.inner.add_actions_count()
254+
}
255+
}
256+
257+
impl Iterator for TransformingCheckpointIterator {
258+
type Item = DeltaResult<FilteredEngineData>;
259+
260+
fn next(&mut self) -> Option<Self::Item> {
261+
let batch = self.inner.next()?;
262+
263+
// Apply the transform to the batch
264+
Some(batch.and_then(|filtered_data| {
265+
let (engine_data, selection_vector) = filtered_data.into_parts();
266+
let transformed = self.evaluator.evaluate(engine_data.as_ref())?;
267+
FilteredEngineData::try_new(transformed, selection_vector)
268+
}))
269+
}
270+
}
271+
151272
/// Orchestrates the process of creating a checkpoint for a table.
152273
///
153274
/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table.
@@ -213,50 +334,91 @@ impl CheckpointWriter {
213334
}
214335
/// Returns the checkpoint data to be written to the checkpoint file.
215336
///
216-
/// This method reads the actions from the log segment and processes them
217-
/// to create the checkpoint data.
337+
/// This method reads actions from the log segment, processes them for checkpoint creation,
338+
/// and applies stats transforms based on table properties:
339+
/// - `delta.checkpoint.writeStatsAsJson` (default: true)
340+
/// - `delta.checkpoint.writeStatsAsStruct` (default: false)
218341
///
219-
/// # Parameters
220-
/// - `engine`: Implementation of [`Engine`] APIs.
342+
/// The returned iterator (implementing [`CheckpointDataIterator`]) yields batches with stats
343+
/// transforms already applied. Use [`CheckpointDataIterator::output_schema`] to get the
344+
/// schema for writing the checkpoint file.
221345
///
222-
/// # Returns: [`ActionReconciliationIterator`] containing the checkpoint data
223-
// This method is the core of the checkpoint generation process. It:
224-
// 1. Determines whether to write a V1 or V2 checkpoint based on the table's
225-
// `v2Checkpoints` feature support
226-
// 2. Reads actions from the log segment using the checkpoint read schema
227-
// 3. Filters and deduplicates actions for the checkpoint
228-
// 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint
229-
// (i.e., if `v2Checkpoints` feature is supported by table)
230-
// 5. Generates the appropriate checkpoint path
231-
pub fn checkpoint_data(
232-
&self,
233-
engine: &dyn Engine,
234-
) -> DeltaResult<ActionReconciliationIterator> {
346+
/// # Engine Usage
347+
///
348+
/// ```ignore
349+
/// let mut checkpoint_data = writer.checkpoint_data(&engine)?;
350+
/// let output_schema = checkpoint_data.output_schema().clone();
351+
/// while let Some(batch) = checkpoint_data.next() {
352+
/// let data = batch?.apply_selection_vector()?;
353+
/// parquet_writer.write(&data, &output_schema).await?;
354+
/// }
355+
/// writer.finalize(&engine, &metadata, checkpoint_data)?;
356+
/// ```
357+
pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult<impl CheckpointDataIterator> {
358+
let config = StatsTransformConfig::from_table_properties(self.snapshot.table_properties());
359+
360+
// Get stats schema from table configuration.
361+
// This already excludes partition columns and applies column mapping.
362+
let stats_schema = self
363+
.snapshot
364+
.table_configuration()
365+
.expected_stats_schema()?;
366+
367+
// Select schema based on V2 checkpoint support
235368
let is_v2_checkpoints_supported = self
236369
.snapshot
237370
.table_configuration()
238371
.is_feature_supported(&TableFeature::V2Checkpoint);
239372

240-
let actions = self.snapshot.log_segment().read_actions(
241-
engine,
242-
CHECKPOINT_ACTIONS_SCHEMA.clone(),
243-
None,
244-
)?;
373+
let base_schema = if is_v2_checkpoints_supported {
374+
&CHECKPOINT_ACTIONS_SCHEMA_V2
375+
} else {
376+
&CHECKPOINT_ACTIONS_SCHEMA_V1
377+
};
378+
379+
// Read schema includes stats_parsed so COALESCE expressions can operate on it.
380+
// For commits, stats_parsed will be read as nulls (column doesn't exist in source).
381+
let read_schema = build_checkpoint_read_schema_with_stats(base_schema, &stats_schema);
245382

246-
// Create iterator over actions for checkpoint data
383+
// Read actions from log segment
384+
let actions =
385+
self.snapshot
386+
.log_segment()
387+
.read_actions(engine, read_schema.clone(), None)?;
388+
389+
// Process actions through reconciliation
247390
let checkpoint_data = ActionReconciliationProcessor::new(
248391
self.deleted_file_retention_timestamp()?,
249392
self.get_transaction_expiration_timestamp()?,
250393
)
251394
.process_actions_iter(actions);
252395

253-
let checkpoint_metadata =
254-
is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine));
396+
// Build output schema based on stats config (determines which fields are included)
397+
let output_schema = build_checkpoint_output_schema(&config, base_schema, &stats_schema);
398+
399+
// Build transform expression and create expression evaluator
400+
let transform_expr = build_stats_transform(&config, stats_schema);
401+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
402+
read_schema.clone(),
403+
transform_expr,
404+
output_schema.clone().into(),
405+
)?;
406+
407+
// For V2 checkpoints, chain the checkpoint metadata batch to the action stream.
408+
// The checkpoint metadata batch uses the read schema (with stats_parsed), so it can
409+
// go through the same stats transform pipeline.
410+
let checkpoint_metadata = is_v2_checkpoints_supported
411+
.then(|| self.create_checkpoint_metadata_batch(engine, &read_schema));
255412

256-
// Wrap the iterator to track action counts
257-
Ok(ActionReconciliationIterator::new(Box::new(
258-
checkpoint_data.chain(checkpoint_metadata),
259-
)))
413+
// Create action reconciliation iterator, chaining checkpoint metadata for V2
414+
let inner =
415+
ActionReconciliationIterator::new(Box::new(checkpoint_data.chain(checkpoint_metadata)));
416+
417+
Ok(TransformingCheckpointIterator::new(
418+
inner,
419+
evaluator,
420+
output_schema,
421+
))
260422
}
261423

262424
/// Finalizes checkpoint creation by saving metadata about the checkpoint.
@@ -280,7 +442,7 @@ impl CheckpointWriter {
280442
self,
281443
engine: &dyn Engine,
282444
metadata: &FileMeta,
283-
checkpoint_data: ActionReconciliationIterator,
445+
checkpoint_data: impl CheckpointDataIterator,
284446
) -> DeltaResult<()> {
285447
// Ensure the checkpoint data iterator is fully exhausted
286448
if !checkpoint_data.is_exhausted() {
@@ -325,23 +487,49 @@ impl CheckpointWriter {
325487
///
326488
/// # Implementation Details
327489
///
328-
/// The function creates a single-row [`EngineData`] batch containing only the
329-
/// version field of the [`CheckpointMetadata`] action. Future implementations will
330-
/// include the additional metadata field `tags` when map support is added.
490+
/// The function creates a single-row [`EngineData`] batch using the full V2 checkpoint
491+
/// schema, with all action fields (add, remove, etc.) set to null except for the
492+
/// `checkpointMetadata` field. This ensures the checkpoint metadata batch has the same
493+
/// schema as other action batches, allowing them to be written to the same Parquet file.
331494
///
332495
/// # Returns:
333496
/// A [`ActionReconciliationBatch`] batch including the single-row [`EngineData`] batch along with
334497
/// an accompanying selection vector with a single `true` value, indicating the action in
335498
/// batch should be included in the checkpoint.
499+
/// Creates the checkpoint metadata batch with the given schema.
500+
///
501+
/// The schema must be the read schema (with stats_parsed) so the batch can go through
502+
/// the same stats transform pipeline as regular action batches.
336503
fn create_checkpoint_metadata_batch(
337504
&self,
338505
engine: &dyn Engine,
506+
schema: &SchemaRef,
339507
) -> DeltaResult<ActionReconciliationBatch> {
340-
let checkpoint_metadata_batch = engine.evaluation_handler().create_one(
341-
CHECKPOINT_METADATA_ACTION_SCHEMA.clone(),
342-
&[Scalar::from(self.version)],
508+
use crate::expressions::{Expression, StructData, Transform};
509+
510+
// Start with an all-null row
511+
let null_row = engine.evaluation_handler().null_row(schema.clone())?;
512+
513+
// Build the checkpointMetadata struct value
514+
let checkpoint_metadata_value = Scalar::Struct(StructData::try_new(
515+
vec![StructField::not_null("version", DataType::LONG)],
516+
vec![Scalar::from(self.version)],
517+
)?);
518+
519+
// Use a Transform to set just the checkpointMetadata field, keeping others null
520+
let transform = Transform::new_top_level().with_replaced_field(
521+
CHECKPOINT_METADATA_NAME,
522+
Arc::new(Expression::literal(checkpoint_metadata_value)),
523+
);
524+
525+
let evaluator = engine.evaluation_handler().new_expression_evaluator(
526+
schema.clone(),
527+
Arc::new(Expression::transform(transform)),
528+
schema.clone().into(),
343529
)?;
344530

531+
let checkpoint_metadata_batch = evaluator.evaluate(null_row.as_ref())?;
532+
345533
let filtered_data = FilteredEngineData::with_all_rows_selected(checkpoint_metadata_batch);
346534

347535
Ok(ActionReconciliationBatch {

kernel/src/checkpoint/stats_transform.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
#![allow(dead_code)] // TODO: Remove when integrated in checkpoint_data()
21
//! Transforms for populating stats_parsed and stats fields in checkpoint data.
32
//!
43
//! When writing checkpoints, statistics can be stored in two formats:

0 commit comments

Comments
 (0)