diff --git a/ffi/src/error.rs b/ffi/src/error.rs index 7d3b232549..8728bcac08 100644 --- a/ffi/src/error.rs +++ b/ffi/src/error.rs @@ -53,6 +53,7 @@ pub enum KernelError { ChangeDataFeedIncompatibleSchema, InvalidCheckpoint, LiteralExpressionTransformError, + CheckpointWriteError, } impl From for KernelError { @@ -61,6 +62,7 @@ impl From for KernelError { // NOTE: By definition, no kernel Error maps to FFIError #[cfg(any(feature = "default-engine", feature = "sync-engine"))] Error::Arrow(_) => KernelError::ArrowError, + Error::CheckpointWrite(_) => KernelError::CheckpointWriteError, Error::EngineDataType(_) => KernelError::EngineDataTypeError, Error::Extract(..) => KernelError::ExtractError, Error::Generic(_) => KernelError::GenericError, diff --git a/kernel/src/checkpoint/log_replay.rs b/kernel/src/checkpoint/log_replay.rs index e80d8ce56d..19d20c82fc 100644 --- a/kernel/src/checkpoint/log_replay.rs +++ b/kernel/src/checkpoint/log_replay.rs @@ -25,42 +25,31 @@ //! the overall process. For each batch of log actions, it: //! 1. Creates a visitor with the current deduplication state //! 2. Applies the visitor to filter actions in the batch -//! 3. Updates counters and state for cross-batch deduplication -//! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which -//! actions should be included in the checkpoint file +//! 3. Tracks state for deduplication across batches +//! 4. Produces a [`CheckpointBatch`] result which includes both the filtered data and counts of +//! actions selected for the checkpoint file //! //! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _}; -use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor}; +use crate::log_replay::{ + FileActionDeduplicator, FileActionKey, HasSelectionVector, LogReplayProcessor, +}; use crate::scan::data_skipping::DataSkippingFilter; use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType}; use crate::utils::require; use crate::{DeltaResult, EngineData, Error}; use std::collections::HashSet; -use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::{Arc, LazyLock}; +use std::sync::LazyLock; /// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`] /// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. This /// processor is leveraged when creating a single-file V2 checkpoint as the V2 spec schema is /// a superset of the V1 spec schema, with the addition of a [`CheckpointMetadata`] action. -/// -/// It processes each action batch via the `process_actions_batch` method, using the -/// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions -/// should be included in the checkpoint. -#[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented pub(crate) struct CheckpointLogReplayProcessor { /// Tracks file actions that have been seen during log replay to avoid duplicates. /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. seen_file_keys: HashSet, - // Arc provides shared mutability for our counters, allowing both the - // iterator to update the values during processing and the caller to observe the final - // counts afterward. The counters are i64 to match the `_last_checkpoint` file schema. - // Tracks the total number of actions included in the checkpoint file. - actions_count: Arc, - // Tracks the total number of add actions included in the checkpoint file. - add_actions_count: Arc, /// Indicates whether a protocol action has been seen in the log. seen_protocol: bool, /// Indicates whether a metadata action has been seen in the log. @@ -71,11 +60,31 @@ pub(crate) struct CheckpointLogReplayProcessor { minimum_file_retention_timestamp: i64, } +/// This struct is the output of the [`CheckpointLogReplayProcessor`]. +/// +/// It contains the filtered batch of actions to be included in the checkpoint, +/// along with statistics about the number of actions filtered for inclusion. +pub(crate) struct CheckpointBatch { + /// The filtered batch of actions to be included in the checkpoint. + pub(crate) filtered_data: FilteredEngineData, + /// The number of actions in the batch filtered for inclusion in the checkpoint. + pub(crate) actions_count: i64, + /// The number of add actions in the batch filtered for inclusion in the checkpoint. + pub(crate) add_actions_count: i64, +} + +impl HasSelectionVector for CheckpointBatch { + fn has_selected_rows(&self) -> bool { + self.filtered_data.has_selected_rows() + } +} + impl LogReplayProcessor for CheckpointLogReplayProcessor { - type Output = FilteredEngineData; + type Output = CheckpointBatch; /// Processes a batch of actions read from the log during reverse chronological replay - /// and returns a filtered batch ([`FilteredEngineData`]) to be included in the checkpoint. + /// and returns a [`CheckpointBatch`], which contains the filtered actions to be + /// included in the checkpoint file, along with statistics about the included actions. /// /// This method delegates the filtering logic to the [`CheckpointVisitor`], which implements /// the deduplication rules described in the module documentation. The method tracks @@ -100,23 +109,19 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor { ); visitor.visit_rows_of(batch.as_ref())?; - // Update the total actions and add actions counters. Relaxed ordering is sufficient - // here as we only care about the total count when writing the _last_checkpoint file. - // (the ordering is not important for correctness) - self.actions_count.fetch_add( - visitor.file_actions_count + visitor.non_file_actions_count, - Ordering::Relaxed, - ); - self.add_actions_count - .fetch_add(visitor.add_actions_count, Ordering::Relaxed); - // Update protocol and metadata seen flags self.seen_protocol = visitor.seen_protocol; self.seen_metadata = visitor.seen_metadata; - Ok(FilteredEngineData { + let filtered_data = FilteredEngineData { data: batch, selection_vector: visitor.selection_vector, + }; + + Ok(CheckpointBatch { + filtered_data, + actions_count: visitor.non_file_actions_count + visitor.file_actions_count, + add_actions_count: visitor.add_actions_count, }) } @@ -127,16 +132,9 @@ impl LogReplayProcessor for CheckpointLogReplayProcessor { } impl CheckpointLogReplayProcessor { - #[allow(unused)] // TODO(seb): Remove once checkpoint api is implemented - pub(crate) fn new( - actions_count: Arc, - add_actions_count: Arc, - minimum_file_retention_timestamp: i64, - ) -> Self { + pub(crate) fn new(minimum_file_retention_timestamp: i64) -> Self { Self { seen_file_keys: Default::default(), - actions_count, - add_actions_count, seen_protocol: false, seen_metadata: false, seen_txns: Default::default(), @@ -463,10 +461,13 @@ impl RowVisitor for CheckpointVisitor<'_> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::arrow::array::StringArray; use crate::utils::test_utils::{action_batch, parse_json_batch}; - use std::collections::HashSet; + + use itertools::Itertools; /// Helper function to create test batches from JSON strings fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box, bool)> { @@ -478,23 +479,18 @@ mod tests { fn run_checkpoint_test( input_batches: Vec<(Box, bool)>, ) -> DeltaResult<(Vec, i64, i64)> { - let actions_count = Arc::new(AtomicI64::new(0)); - let add_actions_count = Arc::new(AtomicI64::new(0)); - let results: Vec<_> = CheckpointLogReplayProcessor::new( - actions_count.clone(), - add_actions_count.clone(), - 0, // minimum_file_retention_timestamp - ) - .process_actions_iter(input_batches.into_iter().map(Ok)) - .collect::>>()?; - - Ok(( - results, - actions_count.load(Ordering::Relaxed), - add_actions_count.load(Ordering::Relaxed), - )) + let processed_batches: Vec<_> = CheckpointLogReplayProcessor::new(0) + .process_actions_iter(input_batches.into_iter().map(Ok)) + .try_collect()?; + let total_count: i64 = processed_batches.iter().map(|b| b.actions_count).sum(); + let add_count: i64 = processed_batches.iter().map(|b| b.add_actions_count).sum(); + let filtered_data = processed_batches + .into_iter() + .map(|b| b.filtered_data) + .collect(); + + Ok((filtered_data, total_count, add_count)) } - #[test] fn test_checkpoint_visitor() -> DeltaResult<()> { let data = action_batch(); diff --git a/kernel/src/checkpoint/mod.rs b/kernel/src/checkpoint/mod.rs index 490e2cf993..e9040550aa 100644 --- a/kernel/src/checkpoint/mod.rs +++ b/kernel/src/checkpoint/mod.rs @@ -1,8 +1,393 @@ -//! # Delta Kernel Checkpoint API +//! This module implements the API for writing single-file checkpoints. //! -//! This module implements the API for writing checkpoints in delta tables. -//! Checkpoints provide a compact summary of the table state, enabling faster recovery by -//! avoiding full log replay. This API supports three checkpoint types: +//! The entry-points for this API are: +//! 1. [`Snapshot::checkpoint`] +//! 2. [`Table::checkpoint`] //! -//! TODO!(seb): Include docs when implemented -pub(crate) mod log_replay; +//! ## Checkpoint Types and Selection Logic +//! This API supports two checkpoint types, selected based on table features: +//! +//! | Table Feature | Resulting Checkpoint Type | Description | +//! |------------------|-------------------------------|-----------------------------------------------------------------------------| +//! | No v2Checkpoints | Single-file Classic-named V1 | Follows V1 specification without [`CheckpointMetadata`] action | +//! | v2Checkpoints | Single-file Classic-named V2 | Follows V2 specification with [`CheckpointMetadata`] action while maintaining backward compatibility via classic naming | +//! +//! For more information on the V1/V2 specifications, see the following protocol section: +//! +//! +//! ## Architecture +//! +//! - [`CheckpointWriter`] - Core component that manages the checkpoint creation workflow +//! - [`CheckpointDataIterator`] - Iterator over the checkpoint data to be written +//! +//! ## Usage +//! +//! The following steps outline the process of creating a checkpoint: +//! +//! 1. Create a [`CheckpointWriter`] using [`Snapshot::checkpoint`] or [`Table::checkpoint`] +//! 2. Get the checkpoint path from [`CheckpointWriter::checkpoint_path`] +//! 2. Get the checkpoint data from [`CheckpointWriter::checkpoint_data`] +//! 3. Write the data to the path in object storage (engine-specific) +//! 4. Collect metadata ([`FileMeta`]) from the write operation +//! 5. Pass the metadata and exhausted data iterator to `CheckpointWriter::finalize` +//! +//! ```no_run +//! # use std::sync::Arc; +//! # use delta_kernel::checkpoint::CheckpointDataIterator; +//! # use delta_kernel::checkpoint::CheckpointWriter; +//! # use delta_kernel::Engine; +//! # use delta_kernel::table::Table; +//! # use delta_kernel::DeltaResult; +//! # use delta_kernel::Error; +//! # use delta_kernel::FileMeta; +//! # use delta_kernel::snapshot::Snapshot; +//! # use url::Url; +//! fn write_checkpoint_file(path: Url, data: &CheckpointDataIterator) -> DeltaResult { +//! todo!() /* engine-specific logic to write data to object storage*/ +//! } +//! +//! let engine: &dyn Engine = todo!(); /* create engine instance */ +//! +//! // Create a table instance for the table you want to checkpoint +//! let table = Table::try_from_uri("./tests/data/app-txn-no-checkpoint")?; +//! +//! // Create a checkpoint writer from a version of the table (e.g., version 1) +//! // Alternatively, if you have a snapshot, you can use `Snapshot::checkpoint()` +//! let mut writer = table.checkpoint(engine, Some(1))?; +//! +//! // Get the checkpoint path and data +//! let checkpoint_path = writer.checkpoint_path()?; +//! let checkpoint_data = writer.checkpoint_data(engine)?; +//! +//! // Write the checkpoint data to the object store and collect metadata +//! let metadata: FileMeta = write_checkpoint_file(checkpoint_path, &checkpoint_data)?; +//! +//! /* IMPORTANT: All data must be written before finalizing the checkpoint */ +//! +//! // writer.finalize(&engine, &metadata, checkpoint_data)?; +//! +//! # Ok::<_, Error>(()) +//! ``` +//! +//! ## Warning +//! Multi-part (V1) checkpoints are DEPRECATED and UNSAFE. +//! +//! ## Note +//! We currently do not plan to support UUID-named V2 checkpoints, since S3's put-if-absent +//! semantics remove the need for UUIDs to ensure uniqueness. Supporting only classic-named +//! checkpoints avoids added complexity, such as coordinating naming decisions between kernel and +//! engine, and handling coexistence with legacy V1 checkpoints. If a compelling use case arises +//! in the future, we can revisit this decision. +//! +//! [`CheckpointMetadata`]: crate::actions::CheckpointMetadata +//! [`LastCheckpointHint`]: crate::snapshot::LastCheckpointHint +//! [`Table::checkpoint`]: crate::table::Table::checkpoint +// Future extensions: +// - TODO(#837): Multi-file V2 checkpoints are not supported yet. The API is designed to be extensible for future +// multi-file support, but the current implementation only supports single-file checkpoints. +use std::sync::{Arc, LazyLock}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use crate::actions::CHECKPOINT_METADATA_NAME; +use crate::actions::{ + schemas::GetStructField, Add, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, + METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME, SIDECAR_NAME, +}; +use crate::engine_data::FilteredEngineData; +use crate::expressions::Scalar; +use crate::log_replay::LogReplayProcessor; +use crate::path::ParsedLogPath; +use crate::schema::{DataType, SchemaRef, StructField, StructType}; +use crate::snapshot::Snapshot; +use crate::{DeltaResult, Engine, Error, EvaluationHandlerExtension, FileMeta}; +use log_replay::{CheckpointBatch, CheckpointLogReplayProcessor}; + +use url::Url; + +mod log_replay; +#[cfg(test)] +mod tests; + +const SECONDS_PER_MINUTE: u64 = 60; +const MINUTES_PER_HOUR: u64 = 60; +const HOURS_PER_DAY: u64 = 24; +/// The default retention period for deleted files in seconds. +/// This is set to 7 days, which is the default in delta-spark. +const DEFAULT_RETENTION_SECS: u64 = 7 * HOURS_PER_DAY * MINUTES_PER_HOUR * SECONDS_PER_MINUTE; + +/// Schema for extracting relevant actions from log files for checkpoint creation +static CHECKPOINT_ACTIONS_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new([ + Option::::get_struct_field(ADD_NAME), + Option::::get_struct_field(REMOVE_NAME), + Option::::get_struct_field(METADATA_NAME), + Option::::get_struct_field(PROTOCOL_NAME), + Option::::get_struct_field(SET_TRANSACTION_NAME), + Option::::get_struct_field(SIDECAR_NAME), + ])) +}); + +// Schema of the [`CheckpointMetadata`] action that is included in V2 checkpoints +// We cannot use `CheckpointMetadata::to_schema()` as it would include the 'tags' field which +// we're not supporting yet due to the lack of map support TODO(#880). +static CHECKPOINT_METADATA_ACTION_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new([StructField::nullable( + CHECKPOINT_METADATA_NAME, + DataType::struct_type([StructField::not_null("version", DataType::LONG)]), + )])) +}); + +/// An iterator over the checkpoint data to be written to the file. +/// +/// This iterator yields filtered checkpoint data batches ([`FilteredEngineData`]) and +/// tracks action statistics required for finalizing the checkpoint. +/// +/// # Warning +/// The [`CheckpointDataIterator`] must be fully consumed to ensure proper collection of statistics for +/// the checkpoint. Additionally, all yielded data must be written to the specified path before calling +/// `CheckpointWriter::finalize`. Failing to do so may result in data loss or corruption. +pub struct CheckpointDataIterator { + /// The nested iterator that yields checkpoint batches with action counts + checkpoint_batch_iterator: Box>>, + /// Running total of actions included in the checkpoint + actions_count: i64, + /// Running total of add actions included in the checkpoint + add_actions_count: i64, +} + +impl Iterator for CheckpointDataIterator { + type Item = DeltaResult; + + /// Advances the iterator and returns the next value. + /// + /// This implementation transforms the `CheckpointBatch` items from the nested iterator into + /// [`FilteredEngineData`] items for the engine to write, while accumulating action counts from + /// each batch. The [`CheckpointDataIterator`] is passed back to the kernel on call to + /// `CheckpointWriter::finalize` for counts to be read and written to the `_last_checkpoint` file + fn next(&mut self) -> Option { + Some(self.checkpoint_batch_iterator.next()?.map(|batch| { + self.actions_count += batch.actions_count; + self.add_actions_count += batch.add_actions_count; + batch.filtered_data + })) + } +} + +/// Orchestrates the process of creating a checkpoint for a table. +/// +/// The [`CheckpointWriter`] is the entry point for generating checkpoint data for a Delta table. +/// It automatically selects the appropriate checkpoint format (V1/V2) based on whether the table +/// supports the `v2Checkpoints` reader/writer feature. +/// +/// # Warning +/// The checkpoint data must be fully written to storage before calling `CheckpointWriter::finalize()`. +/// Failing to do so may result in data loss or corruption. +/// +/// # See Also +/// See the [module-level documentation](self) for the complete checkpoint workflow +pub struct CheckpointWriter { + /// Reference to the snapshot (i.e. version) of the table being checkpointed + pub(crate) snapshot: Arc, +} + +impl CheckpointWriter { + /// Returns the URL where the checkpoint file should be written. + /// + /// This method generates the checkpoint path based on the table's root and the version + /// of the underlying snapshot being checkpointed. The resulting path follows the classic + /// Delta checkpoint naming convention (where the version is zero-padded to 20 digits): + /// + /// `/.checkpoint.parquet` + /// + /// For example, if the table root is `s3://bucket/path` and the version is `10`, + /// the checkpoint path will be: `s3://bucket/path/00000000000000000010.checkpoint.parquet` + pub fn checkpoint_path(&self) -> DeltaResult { + ParsedLogPath::new_classic_parquet_checkpoint( + self.snapshot.table_root(), + self.snapshot.version(), + ) + .map(|parsed| parsed.location) + } + + /// Returns the checkpoint data to be written to the checkpoint file. + /// + /// This method reads the actions from the log segment and processes them + /// to create the checkpoint data. + /// + /// # Parameters + /// - `engine`: Implementation of [`Engine`] APIs. + /// + /// # Returns: [`CheckpointDataIterator`] containing the checkpoint data + // This method is the core of the checkpoint generation process. It: + // 1. Determines whether to write a V1 or V2 checkpoint based on the table's + // `v2Checkpoints` feature support + // 2. Reads actions from the log segment using the checkpoint read schema + // 3. Filters and deduplicates actions for the checkpoint + // 4. Chains the checkpoint metadata action if writing a V2 spec checkpoint + // (i.e., if `v2Checkpoints` feature is supported by table) + // 5. Generates the appropriate checkpoint path + pub fn checkpoint_data(&self, engine: &dyn Engine) -> DeltaResult { + let is_v2_checkpoints_supported = self + .snapshot + .table_configuration() + .is_v2_checkpoint_write_supported(); + + let actions = self.snapshot.log_segment().read_actions( + engine, + CHECKPOINT_ACTIONS_SCHEMA.clone(), + CHECKPOINT_ACTIONS_SCHEMA.clone(), + None, + )?; + + // Create iterator over actions for checkpoint data + let checkpoint_data = + CheckpointLogReplayProcessor::new(self.deleted_file_retention_timestamp()?) + .process_actions_iter(actions); + + let checkpoint_metadata = + is_v2_checkpoints_supported.then(|| self.create_checkpoint_metadata_batch(engine)); + + // Wrap the iterator in a CheckpointDataIterator to track action counts + Ok(CheckpointDataIterator { + checkpoint_batch_iterator: Box::new(checkpoint_data.chain(checkpoint_metadata)), + actions_count: 0, + add_actions_count: 0, + }) + } + + /// TODO(#850): Implement the finalize method + /// + /// Finalizes checkpoint creation after verifying all data is persisted. + /// + /// This method **must** be called only after: + /// 1. The checkpoint data iterator has been fully consumed + /// 2. All data has been successfully written to object storage + /// + /// # Parameters + /// - `engine`: Implementation of [`Engine`] apis. + /// - `metadata`: The metadata of the written checkpoint file + /// - `checkpoint_data`: The exhausted checkpoint data iterator (must be fully consumed) + /// + /// # Returns: [`variant@Ok`] if the checkpoint was successfully finalized + #[allow(unused)] + fn finalize( + self, + _engine: &dyn Engine, + _metadata: &FileMeta, + _checkpoint_data: CheckpointDataIterator, + ) -> DeltaResult<()> { + // Verify the iterator is exhausted (optional) + // Implementation will use checkpoint_data.actions_count and checkpoint_data.add_actions_count + + // TODO(#850): Implement the actual finalization logic + Err(Error::checkpoint_write( + "Checkpoint finalization is not yet implemented", + )) + } + + /// Creates the checkpoint metadata action for V2 checkpoints. + /// + /// This function generates the [`CheckpointMetadata`] action that must be included in the + /// V2 spec checkpoint file. This action contains metadata about the checkpoint, particularly + /// its version. + /// + /// # Implementation Details + /// + /// The function creates a single-row [`EngineData`] batch containing only the + /// version field of the [`CheckpointMetadata`] action. Future implementations will + /// include the additional metadata field `tags` when map support is added. + /// + /// # Returns: + /// A [`CheckpointBatch`] batch including the single-row [`EngineData`] batch along with + /// an accompanying selection vector with a single `true` value, indicating the action in + /// batch should be included in the checkpoint. + fn create_checkpoint_metadata_batch( + &self, + engine: &dyn Engine, + ) -> DeltaResult { + let version: i64 = self.snapshot.version().try_into().map_err(|e| { + Error::CheckpointWrite(format!( + "Failed to convert checkpoint version from u64 {} to i64: {}", + self.snapshot.version(), + e + )) + })?; + + let checkpoint_metadata_batch = engine.evaluation_handler().create_one( + CHECKPOINT_METADATA_ACTION_SCHEMA.clone(), + &[Scalar::from(version)], + )?; + + let filtered_data = FilteredEngineData { + data: checkpoint_metadata_batch, + selection_vector: vec![true], // Include the action in the checkpoint + }; + + Ok(CheckpointBatch { + filtered_data, + actions_count: 1, + add_actions_count: 0, + }) + } + + /// This function determines the minimum timestamp before which deleted files + /// are eligible for permanent removal during VACUUM operations. It is used + /// during checkpointing to decide whether to include `remove` actions. + /// + /// If a deleted file's timestamp is older than this threshold (based on the + /// table's `deleted_file_retention_duration`), the corresponding `remove` action + /// is included in the checkpoint, allowing VACUUM operations to later identify + /// and clean up those files. + /// + /// # Returns: + /// The cutoff timestamp in milliseconds since epoch, matching the remove action's + /// `deletion_timestamp` field format for comparison. + /// + /// # Note: The default retention period is 7 days, matching delta-spark's behavior. + fn deleted_file_retention_timestamp(&self) -> DeltaResult { + let retention_duration = self + .snapshot + .table_properties() + .deleted_file_retention_duration; + + deleted_file_retention_timestamp_with_time( + retention_duration, + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| Error::generic(format!("Failed to calculate system time: {}", e)))?, + ) + } +} + +/// Calculates the timestamp threshold for deleted file retention based on the provided duration. +/// This is factored out to allow testing with an injectable time and duration parameter. +/// +/// # Parameters +/// - `retention_duration`: The duration to retain deleted files. The table property +/// `deleted_file_retention_duration` is passed here. If `None`, defaults to 7 days. +/// - `now_duration`: The current time as a [`Duration`]. This allows for testing with +/// a specific time instead of using `SystemTime::now()`. +/// +/// # Returns: The timestamp in milliseconds since epoch +fn deleted_file_retention_timestamp_with_time( + retention_duration: Option, + now_duration: Duration, +) -> DeltaResult { + // Use provided retention duration or default (7 days) + let retention_duration = + retention_duration.unwrap_or_else(|| Duration::from_secs(DEFAULT_RETENTION_SECS)); + + // Convert to milliseconds for remove action deletion_timestamp comparison + let now_ms: i64 = now_duration + .as_millis() + .try_into() + .map_err(|_| Error::checkpoint_write("Current timestamp exceeds i64 millisecond range"))?; + + let retention_ms: i64 = retention_duration + .as_millis() + .try_into() + .map_err(|_| Error::checkpoint_write("Retention duration exceeds i64 millisecond range"))?; + + // Simple subtraction - will produce negative values if retention > now + Ok(now_ms - retention_ms) +} diff --git a/kernel/src/checkpoint/tests.rs b/kernel/src/checkpoint/tests.rs new file mode 100644 index 0000000000..e14adca399 --- /dev/null +++ b/kernel/src/checkpoint/tests.rs @@ -0,0 +1,362 @@ +use std::{sync::Arc, time::Duration}; + +use super::DEFAULT_RETENTION_SECS; +use crate::actions::{Add, Metadata, Protocol, Remove}; +use crate::arrow::array::{ArrayRef, StructArray}; +use crate::arrow::datatypes::{DataType, Schema}; +use crate::checkpoint::deleted_file_retention_timestamp_with_time; +use crate::engine::arrow_data::ArrowEngineData; +use crate::engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine}; +use crate::object_store::{memory::InMemory, path::Path, ObjectStore}; +use crate::utils::test_utils::Action; +use crate::DeltaResult; +use crate::Table; + +use arrow_55::{ + array::{create_array, RecordBatch}, + datatypes::Field, +}; + +use test_utils::delta_path_for_version; +use url::Url; + +#[test] +fn test_deleted_file_retention_timestamp() -> DeltaResult<()> { + const MILLIS_PER_SECOND: i64 = 1_000; + + let reference_time_secs = 10_000; + let reference_time = Duration::from_secs(reference_time_secs); + let reference_time_millis = reference_time.as_millis() as i64; + + // Retention scenarios: + // ( retention duration , expected_timestamp ) + let test_cases = [ + // None = Default retention (7 days) + ( + None, + reference_time_millis - (DEFAULT_RETENTION_SECS as i64 * MILLIS_PER_SECOND), + ), + // Zero retention + (Some(Duration::from_secs(0)), reference_time_millis), + // Custom retention (e.g., 2000 seconds) + ( + Some(Duration::from_secs(2_000)), + reference_time_millis - (2_000 * MILLIS_PER_SECOND), + ), + ]; + + for (retention, expected_timestamp) in test_cases { + let result = deleted_file_retention_timestamp_with_time(retention, reference_time)?; + assert_eq!(result, expected_timestamp); + } + + Ok(()) +} + +#[test] +fn test_create_checkpoint_metadata_batch() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + // 1st commit (version 0) - metadata and protocol actions + // Protocol action does not include the v2Checkpoint reader/writer feature. + write_commit_to_store( + &store, + vec![ + create_v2_checkpoint_protocol_action(), + create_metadata_action(), + ], + 0, + )?; + + let table_root = Url::parse("memory:///")?; + let table = Table::new(table_root); + let snapshot = table.snapshot(&engine, None)?; + let writer = Arc::new(snapshot).checkpoint()?; + + let checkpoint_batch = writer.create_checkpoint_metadata_batch(&engine)?; + + // Check selection vector has one true value + assert_eq!(checkpoint_batch.filtered_data.selection_vector, vec![true]); + + // Verify the underlying EngineData contains the expected CheckpointMetadata action + let arrow_engine_data = + ArrowEngineData::try_from_engine_data(checkpoint_batch.filtered_data.data)?; + let record_batch = arrow_engine_data.record_batch(); + + // Build the expected RecordBatch + // Note: The schema is a struct with a single field "checkpointMetadata" of type struct + // containing a single field "version" of type long + let expected_schema = Arc::new(Schema::new(vec![Field::new( + "checkpointMetadata", + DataType::Struct(vec![Field::new("version", DataType::Int64, false)].into()), + true, + )])); + let expected = RecordBatch::try_new( + expected_schema, + vec![Arc::new(StructArray::from(vec![( + Arc::new(Field::new("version", DataType::Int64, false)), + create_array!(Int64, [0]) as ArrayRef, + )]))], + ) + .unwrap(); + + assert_eq!(*record_batch, expected); + assert_eq!(checkpoint_batch.actions_count, 1); + assert_eq!(checkpoint_batch.add_actions_count, 0); + + Ok(()) +} + +/// TODO(#855): Merge copies and move to `test_utils` +/// Create an in-memory store and return the store and the URL for the store's _delta_log directory. +fn new_in_memory_store() -> (Arc, Url) { + ( + Arc::new(InMemory::new()), + Url::parse("memory:///") + .unwrap() + .join("_delta_log/") + .unwrap(), + ) +} + +/// TODO(#855): Merge copies and move to `test_utils` +/// Writes all actions to a _delta_log json commit file in the store. +/// This function formats the provided filename into the _delta_log directory. +fn write_commit_to_store( + store: &Arc, + actions: Vec, + version: u64, +) -> DeltaResult<()> { + let json_lines: Vec = actions + .into_iter() + .map(|action| serde_json::to_string(&action).expect("action to string")) + .collect(); + let content = json_lines.join("\n"); + + let commit_path = format!("_delta_log/{}", delta_path_for_version(version, "json")); + + tokio::runtime::Runtime::new() + .expect("create tokio runtime") + .block_on(async { store.put(&Path::from(commit_path), content.into()).await })?; + + Ok(()) +} + +/// Create a Protocol action without v2Checkpoint feature support +fn create_basic_protocol_action() -> Action { + Action::Protocol( + Protocol::try_new(3, 7, Some(Vec::::new()), Some(Vec::::new())).unwrap(), + ) +} + +/// Create a Protocol action with v2Checkpoint feature support +fn create_v2_checkpoint_protocol_action() -> Action { + Action::Protocol( + Protocol::try_new(3, 7, Some(vec!["v2Checkpoint"]), Some(vec!["v2Checkpoint"])).unwrap(), + ) +} + +/// Create a Metadata action +fn create_metadata_action() -> Action { + Action::Metadata(Metadata { + id: "test-table".into(), + schema_string: "{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}".to_string(), + ..Default::default() + }) +} + +/// Create an Add action with the specified path +fn create_add_action(path: &str) -> Action { + Action::Add(Add { + path: path.into(), + data_change: true, + ..Default::default() + }) +} + +/// Create a Remove action with the specified path +/// +/// The remove action has deletion_timestamp set to i64::MAX to ensure the +/// remove action is not considered expired during testing. +fn create_remove_action(path: &str) -> Action { + Action::Remove(Remove { + path: path.into(), + data_change: true, + deletion_timestamp: Some(i64::MAX), // Ensure the remove action is not expired + ..Default::default() + }) +} + +/// Tests the `checkpoint()` API with: +/// - A table that does not support v2Checkpoint +/// - No version specified (latest version is used) +#[test] +fn test_v1_checkpoint_latest_version_by_default() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + // 1st commit: adds `fake_path_1` + write_commit_to_store(&store, vec![create_add_action("fake_path_1")], 0)?; + + // 2nd commit: adds `fake_path_2` & removes `fake_path_1` + write_commit_to_store( + &store, + vec![ + create_add_action("fake_path_2"), + create_remove_action("fake_path_1"), + ], + 1, + )?; + + // 3rd commit: metadata & protocol actions + // Protocol action does not include the v2Checkpoint reader/writer feature. + write_commit_to_store( + &store, + vec![create_metadata_action(), create_basic_protocol_action()], + 2, + )?; + + let table_root = Url::parse("memory:///")?; + let table = Table::new(table_root); + let writer = table.checkpoint(&engine, None)?; + + // Verify the checkpoint file path is the latest version by default. + assert_eq!( + writer.checkpoint_path()?, + Url::parse("memory:///_delta_log/00000000000000000002.checkpoint.parquet")? + ); + + let mut data_iter = writer.checkpoint_data(&engine)?; + // The first batch should be the metadata and protocol actions. + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true, true]); + + // The second batch should include both the add action and the remove action + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true, true]); + + // The third batch should not be included as the selection vector does not + // contain any true values, as the file added is removed in a following commit. + assert!(data_iter.next().is_none()); + + assert_eq!(data_iter.actions_count, 4); + assert_eq!(data_iter.add_actions_count, 1); + + // TODO(#850): Finalize and verify _last_checkpoint + Ok(()) +} + +/// Tests the `checkpoint()` API with: +/// - A table that does not support v2Checkpoint +/// - A specific version specified (version 0) +#[test] +fn test_v1_checkpoint_specific_version() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + // 1st commit (version 0) - metadata and protocol actions + // Protocol action does not include the v2Checkpoint reader/writer feature. + write_commit_to_store( + &store, + vec![create_basic_protocol_action(), create_metadata_action()], + 0, + )?; + + // 2nd commit (version 1) - add actions + write_commit_to_store( + &store, + vec![ + create_add_action("file1.parquet"), + create_add_action("file2.parquet"), + ], + 1, + )?; + + let table_root = Url::parse("memory:///")?; + let table = Table::new(table_root); + // Specify version 0 for checkpoint + let writer = table.checkpoint(&engine, Some(0))?; + + // Verify the checkpoint file path is the specified version. + assert_eq!( + writer.checkpoint_path()?, + Url::parse("memory:///_delta_log/00000000000000000000.checkpoint.parquet")? + ); + + let mut data_iter = writer.checkpoint_data(&engine)?; + // The first batch should be the metadata and protocol actions. + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true, true]); + + // No more data should exist because we only requested version 0 + assert!(data_iter.next().is_none()); + + assert_eq!(data_iter.actions_count, 2); + assert_eq!(data_iter.add_actions_count, 0); + + // TODO(#850): Finalize and verify _last_checkpoint + Ok(()) +} + +/// Tests the `checkpoint()` API with: +/// - A table that does supports v2Checkpoint +/// - No version specified (latest version is used) +#[test] +fn test_v2_checkpoint_supported_table() -> DeltaResult<()> { + let (store, _) = new_in_memory_store(); + let engine = DefaultEngine::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + // 1st commit: adds `fake_path_2` & removes `fake_path_1` + write_commit_to_store( + &store, + vec![ + create_add_action("fake_path_2"), + create_remove_action("fake_path_1"), + ], + 0, + )?; + + // 2nd commit: metadata & protocol actions + // Protocol action includes the v2Checkpoint reader/writer feature. + write_commit_to_store( + &store, + vec![ + create_metadata_action(), + create_v2_checkpoint_protocol_action(), + ], + 1, + )?; + + let table_root = Url::parse("memory:///")?; + let table = Table::new(table_root); + let writer = table.checkpoint(&engine, None)?; + + // Verify the checkpoint file path is the latest version by default. + assert_eq!( + writer.checkpoint_path()?, + Url::parse("memory:///_delta_log/00000000000000000001.checkpoint.parquet")? + ); + + let mut data_iter = writer.checkpoint_data(&engine)?; + // The first batch should be the metadata and protocol actions. + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true, true]); + + // The second batch should include both the add action and the remove action + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true, true]); + + // The third batch should be the CheckpointMetaData action. + let batch = data_iter.next().unwrap()?; + assert_eq!(batch.selection_vector, [true]); + + // No more data should exist + assert!(data_iter.next().is_none()); + + assert_eq!(data_iter.actions_count, 5); + assert_eq!(data_iter.add_actions_count, 1); + + // TODO(#850): Finalize and verify _last_checkpoint + Ok(()) +} diff --git a/kernel/src/error.rs b/kernel/src/error.rs index e1b7ed8e64..3a75514a66 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -36,6 +36,9 @@ pub enum Error { #[error(transparent)] Arrow(ArrowError), + #[error("Error writing checkpoint: {0}")] + CheckpointWrite(String), + /// User tried to convert engine data to the wrong type #[error("Invalid engine data type. Could not convert to {0}")] EngineDataType(String), @@ -210,6 +213,10 @@ pub enum Error { // Convenience constructors for Error types that take a String argument impl Error { + pub(crate) fn checkpoint_write(msg: impl ToString) -> Self { + Self::CheckpointWrite(msg.to_string()) + } + pub fn generic_err(source: impl Into>) -> Self { Self::GenericError { source: source.into(), diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index f607dc31eb..b77ec564e1 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -74,7 +74,7 @@ use url::Url; use self::schema::{DataType, SchemaRef}; pub mod actions; -mod checkpoint; +pub mod checkpoint; pub mod engine_data; pub mod error; pub mod expressions; diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 12331c8fc2..bea9b1c5fa 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::actions::set_transaction::SetTransactionScanner; use crate::actions::{Metadata, Protocol}; +use crate::checkpoint::CheckpointWriter; use crate::log_segment::{self, LogSegment}; use crate::scan::ScanBuilder; use crate::schema::{Schema, SchemaRef}; @@ -18,7 +19,11 @@ use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use url::Url; -const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; +/// Name of the _last_checkpoint file that provides metadata about the last checkpoint +/// created for the table. This file is used as a hint for the engine to quickly locate +/// the latest checkpoint without a full directory listing. +pub(crate) const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; + // TODO expose methods for accessing the files of a table (with file pruning). /// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists /// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they @@ -244,6 +249,14 @@ impl Snapshot { }) } + /// Creates a [`CheckpointWriter`] for generating a checkpoint from this snapshot. + /// + /// See the [`crate::checkpoint`] module documentation for more details on checkpoint types + /// and the overall checkpoint process. + pub fn checkpoint(self: Arc) -> DeltaResult { + Ok(CheckpointWriter { snapshot: self }) + } + /// Log segment this snapshot uses #[internal_api] pub(crate) fn log_segment(&self) -> &LogSegment { diff --git a/kernel/src/table.rs b/kernel/src/table.rs index 97e1596d77..8da7490736 100644 --- a/kernel/src/table.rs +++ b/kernel/src/table.rs @@ -4,9 +4,11 @@ use std::borrow::Cow; use std::ops::Deref; use std::path::PathBuf; +use std::sync::Arc; use url::Url; +use crate::checkpoint::CheckpointWriter; use crate::snapshot::Snapshot; use crate::table_changes::TableChanges; use crate::transaction::Transaction; @@ -98,6 +100,24 @@ impl Table { ) } + /// Creates a [`CheckpointWriter`] for generating checkpoints at the specified table version. + /// + /// See the [`crate::checkpoint`] module documentation for more details on checkpoint types + /// and the overall checkpoint process. + /// + /// # Parameters + /// - `engine`: Implementation of [`Engine`] apis. + /// - `version`: The version of the table to checkpoint. If [`None`], the latest version of the + /// table will be checkpointed. + pub fn checkpoint( + &self, + engine: &dyn Engine, + version: impl Into>, + ) -> DeltaResult { + let snapshot = Arc::new(self.snapshot(engine, version.into())?); + Ok(CheckpointWriter { snapshot }) + } + /// Create a new write transaction for this table. pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult { Transaction::try_new(self.snapshot(engine, None)?) diff --git a/kernel/src/table_configuration.rs b/kernel/src/table_configuration.rs index 1acdb870bf..524439af66 100644 --- a/kernel/src/table_configuration.rs +++ b/kernel/src/table_configuration.rs @@ -263,6 +263,21 @@ impl TableConfiguration { version => (2..=6).contains(&version), } } + + /// Returns `true` if V2 checkpoint is supported on this table. To support V2 checkpoint, + /// a table must support reader version 3, writer version 7, and the v2Checkpoint feature in + /// both the protocol's readerFeatures and writerFeatures. + /// + /// See: + pub(crate) fn is_v2_checkpoint_write_supported(&self) -> bool { + let read_supported = self + .protocol() + .has_reader_feature(&ReaderFeature::V2Checkpoint); + let write_supported = self + .protocol() + .has_writer_feature(&WriterFeature::V2Checkpoint); + read_supported && write_supported + } } #[cfg(test)]