diff --git a/kernel/src/table_changes/log_replay.rs b/kernel/src/table_changes/log_replay.rs index dbc28b1706..ab930397b1 100644 --- a/kernel/src/table_changes/log_replay.rs +++ b/kernel/src/table_changes/log_replay.rs @@ -12,7 +12,7 @@ use crate::actions::{ }; use crate::engine_data::{GetData, TypedGetData}; use crate::expressions::{column_name, ColumnName}; -use crate::path::ParsedLogPath; +use crate::path::{AsUrl, ParsedLogPath}; use crate::scan::data_skipping::DataSkippingFilter; use crate::scan::state::DvInfo; use crate::schema::{ @@ -253,8 +253,26 @@ impl LogReplayScanner { // same as an `add` action. remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path)); } + + // If ICT is enabled, then set the timestamp to be the ICT; otherwise, default to the last_modified timestamp value + let mut timestamp = commit_file.location.last_modified; + if table_configuration.is_feature_enabled(&TableFeature::InCommitTimestamp) { + if let Ok(in_commit_timestamp) = commit_file.read_in_commit_timestamp(engine) { + timestamp = in_commit_timestamp; + } + } + + info!( + remove_dvs_size = remove_dvs.len(), + has_cdc_action = has_cdc_action, + file_path = %commit_file.location.as_url(), + version = commit_file.version, + timestamp = timestamp, + "Phase 1 of CDF query processing completed" + ); + Ok(LogReplayScanner { - timestamp: commit_file.location.last_modified, + timestamp, commit_file, has_cdc_action, remove_dvs, diff --git a/kernel/src/table_changes/log_replay/tests.rs b/kernel/src/table_changes/log_replay/tests.rs index 87b6a02659..8a9755b758 100644 --- a/kernel/src/table_changes/log_replay/tests.rs +++ b/kernel/src/table_changes/log_replay/tests.rs @@ -992,3 +992,73 @@ async fn print_table_configuration() { assert!(log_output.contains("\"delta.columnMapping.mode\": \"none\"")); assert!(log_output.contains("\"delta.enableDeletionVectors\": \"true\"")); } + +#[tokio::test] +async fn print_table_info_post_phase1() { + let logs = Arc::new(Mutex::new(Vec::new())); + let logs_clone = logs.clone(); + + let subscriber = tracing_subscriber::registry().with( + tracing_subscriber::fmt::layer() + .with_writer(move || LogWriter(logs_clone.clone())) + .with_ansi(false), + ); + + let _guard = tracing::subscriber::set_default(subscriber); + + let engine = Arc::new(SyncEngine::new()); + let mut mock_table = LocalMockTable::new(); + // This specific commit (with these actions) isn't necessary to test the tracing, we just need to have one commit with any actions + mock_table + .commit([ + Action::Metadata( + Metadata::try_new( + None, + None, + get_schema(), + vec![], + 0, + HashMap::from([ + ("delta.enableChangeDataFeed".to_string(), "true".to_string()), + ( + "delta.enableDeletionVectors".to_string(), + "true".to_string(), + ), + ("delta.columnMapping.mode".to_string(), "none".to_string()), + ]), + ) + .unwrap(), + ), + Action::Protocol( + Protocol::try_new( + 3, + 7, + Some([TableFeature::DeletionVectors]), + Some([TableFeature::DeletionVectors, TableFeature::ChangeDataFeed]), + ) + .unwrap(), + ), + ]) + .await; + + let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None) + .unwrap() + .into_iter(); + + let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap(); + let table_config = get_default_table_config(&table_root_url); + + let _scan_batches: DeltaResult> = + table_changes_action_iter(engine, &table_config, commits, get_schema().into(), None) + .unwrap() + .try_collect(); + + let log_output = String::from_utf8(logs.lock().unwrap().clone()).unwrap(); + + assert!(log_output.contains("Phase 1 of CDF query processing completed")); + assert!(log_output.contains("remove_dvs_size=0")); + assert!(log_output.contains("has_cdc_action=false")); + assert!(log_output.contains("file_path=")); + assert!(log_output.contains("version=0")); + assert!(log_output.contains("timestamp=")); +}