diff --git a/crates/core/src/kernel/snapshot/log_segment.rs b/crates/core/src/kernel/snapshot/log_segment.rs index ffd95045dc..050d3b0603 100644 --- a/crates/core/src/kernel/snapshot/log_segment.rs +++ b/crates/core/src/kernel/snapshot/log_segment.rs @@ -829,6 +829,10 @@ pub(super) mod tests { .await .unwrap(); + assert_eq!(commit.metrics.num_retries, 0); + assert_eq!(commit.metrics.num_log_files_cleaned_up, 0); + assert_eq!(commit.metrics.new_checkpoint_created, false); + let batches = LogSegment::try_new( &Path::default(), Some(commit.version), diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index c9045f6e4d..3d461540a7 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -86,7 +86,9 @@ use serde_json::Value; use tracing::*; use uuid::Uuid; +pub use self::conflict_checker::CommitConflictError; use self::conflict_checker::{TransactionInfo, WinningCommitSummary}; +pub use self::protocol::INSTANCE as PROTOCOL; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; use crate::errors::DeltaTableError; use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction}; @@ -97,9 +99,7 @@ use crate::table::config::TableConfig; use crate::table::state::DeltaTableState; use crate::{crate_version, DeltaResult}; use delta_kernel::table_features::{ReaderFeatures, WriterFeatures}; - -pub use self::conflict_checker::CommitConflictError; -pub use self::protocol::INSTANCE as PROTOCOL; +use serde::{Deserialize, Serialize}; use super::CustomExecuteHandler; @@ -113,6 +113,36 @@ mod state; const DELTA_LOG_FOLDER: &str = "_delta_log"; pub(crate) const DEFAULT_RETRIES: usize = 15; +#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CommitMetrics { + /// Number of retries before a successful commit + pub num_retries: u64, +} + +#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PostCommitMetrics { + /// Whether a new checkpoint was created as part of this commit + pub new_checkpoint_created: bool, + + /// Number of log files cleaned up + pub num_log_files_cleaned_up: u64, +} + +#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Metrics { + /// Number of retries before a successful commit + pub num_retries: u64, + + /// Whether a new checkpoint was created as part of this commit + pub new_checkpoint_created: bool, + + /// Number of log files cleaned up + pub num_log_files_cleaned_up: u64, +} + /// Error raised while commititng transaction #[derive(thiserror::Error, Debug)] pub enum TransactionError { @@ -598,6 +628,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { log_store: this.log_store, table_data: this.table_data, custom_execute_handler: this.post_commit_hook_handler, + metrics: CommitMetrics { num_retries: 0 }, }); } @@ -675,6 +706,9 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { log_store: this.log_store, table_data: this.table_data, custom_execute_handler: this.post_commit_hook_handler, + metrics: CommitMetrics { + num_retries: attempt_number as u64 - 1, + }, }); } Err(TransactionError::VersionAlreadyExists(version)) => { @@ -708,11 +742,12 @@ pub struct PostCommit<'a> { log_store: LogStoreRef, table_data: Option<&'a dyn TableReference>, custom_execute_handler: Option>, + metrics: CommitMetrics, } impl PostCommit<'_> { /// Runs the post commit activities - async fn run_post_commit_hook(&self) -> DeltaResult { + async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> { if let Some(table) = self.table_data { let post_commit_operation_id = Uuid::new_v4(); let mut snapshot = table.eager_snapshot().clone(); @@ -745,26 +780,30 @@ impl PostCommit<'_> { .await? } + let mut new_checkpoint_created = false; if self.create_checkpoint { // Execute create checkpoint hook - self.create_checkpoint( - &state, - &self.log_store, - self.version, - post_commit_operation_id, - ) - .await?; + new_checkpoint_created = self + .create_checkpoint( + &state, + &self.log_store, + self.version, + post_commit_operation_id, + ) + .await?; } + + let mut num_log_files_cleaned_up: u64 = 0; if cleanup_logs { // Execute clean up logs hook - cleanup_expired_logs_for( + num_log_files_cleaned_up = cleanup_expired_logs_for( self.version, self.log_store.as_ref(), Utc::now().timestamp_millis() - state.table_config().log_retention_duration().as_millis() as i64, Some(post_commit_operation_id), ) - .await?; + .await? as u64; } // Run arbitrary after_post_commit_hook code @@ -777,7 +816,13 @@ impl PostCommit<'_> { ) .await? } - Ok(state) + Ok(( + state, + PostCommitMetrics { + new_checkpoint_created, + num_log_files_cleaned_up, + }, + )) } else { let state = DeltaTableState::try_new( &Path::default(), @@ -786,7 +831,13 @@ impl PostCommit<'_> { Some(self.version), ) .await?; - Ok(state) + Ok(( + state, + PostCommitMetrics { + new_checkpoint_created: false, + num_log_files_cleaned_up: 0, + }, + )) } } async fn create_checkpoint( @@ -795,18 +846,20 @@ impl PostCommit<'_> { log_store: &LogStoreRef, version: i64, operation_id: Uuid, - ) -> DeltaResult<()> { + ) -> DeltaResult { if !table_state.load_config().require_files { warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."); - return Ok(()); + return Ok(false); } let checkpoint_interval = table_state.config().checkpoint_interval() as i64; if ((version + 1) % checkpoint_interval) == 0 { create_checkpoint_for(version, table_state, log_store.as_ref(), Some(operation_id)) - .await? + .await?; + Ok(true) + } else { + Ok(false) } - Ok(()) } } @@ -817,6 +870,9 @@ pub struct FinalizedCommit { /// Version of the finalized commit pub version: i64, + + /// Metrics associated with the commit operation + pub metrics: Metrics, } impl FinalizedCommit { @@ -839,9 +895,14 @@ impl<'a> std::future::IntoFuture for PostCommit<'a> { Box::pin(async move { match this.run_post_commit_hook().await { - Ok(snapshot) => Ok(FinalizedCommit { + Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit { snapshot, version: this.version, + metrics: Metrics { + num_retries: this.metrics.num_retries, + new_checkpoint_created: post_commit_metrics.new_checkpoint_created, + num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up, + }, }), Err(err) => Err(err), } diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index df5b7b2012..fc6f97aee2 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -654,7 +654,7 @@ mod tests { query_id: "test".into(), epoch_id, }; - let v = CommitBuilder::default() + let finalized_commit = CommitBuilder::default() .with_actions(actions) .build( table.state.as_ref().map(|f| f as &dyn TableReference), @@ -662,10 +662,25 @@ mod tests { operation, ) .await - .unwrap() - .version(); + .unwrap(); - assert_eq!(1, v, "Expected the commit to create table version 1"); + assert_eq!( + 1, + finalized_commit.version(), + "Expected the commit to create table version 1" + ); + assert_eq!( + 0, finalized_commit.metrics.num_retries, + "Expected no retries" + ); + assert_eq!( + 0, finalized_commit.metrics.num_log_files_cleaned_up, + "Expected no log files cleaned up" + ); + assert_eq!( + false, finalized_commit.metrics.new_checkpoint_created, + "Expected checkpoint created." + ); table.load().await.expect("Failed to reload table"); assert_eq!( table.version(),