Skip to content
Open
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
936f6f2
tracing changes
lorenarosati Jan 20, 2026
51c1b11
Merge branch 'main' into table-config-tracing
lorenarosati Jan 20, 2026
b311f15
added testing
lorenarosati Jan 20, 2026
8a6ca27
Merge branch 'table-config-tracing' of github.com:lorenarosati/delta-…
lorenarosati Jan 20, 2026
05cfdbe
changes
lorenarosati Jan 21, 2026
563130a
changed everything to delta serialization format
lorenarosati Jan 21, 2026
4842e50
Merge branch 'main' into table-config-tracing
lorenarosati Jan 21, 2026
143749c
formatting
lorenarosati Jan 21, 2026
244d62a
Merge branch 'table-config-tracing' of github.com:lorenarosati/delta-…
lorenarosati Jan 21, 2026
fd341be
use serial_test crate so traced tests dont run at same time as others
lorenarosati Jan 21, 2026
162e710
removed serial
lorenarosati Jan 21, 2026
30a7f84
replaced tracing-test crate with using tracing-subscriber with a test…
lorenarosati Jan 21, 2026
cc2446c
fixed custom writer
lorenarosati Jan 21, 2026
fcf241f
removed subscriber from wrong test
lorenarosati Jan 21, 2026
da7e2be
moved file writer
lorenarosati Jan 21, 2026
b29b946
fixing code cov
lorenarosati Jan 21, 2026
ba191f3
Merge branch 'main' into table-config-tracing
lorenarosati Jan 21, 2026
5b46ce5
fix
lorenarosati Jan 21, 2026
0193410
Merge branch 'table-config-tracing' of github.com:lorenarosati/delta-…
lorenarosati Jan 21, 2026
e28f03f
fix
lorenarosati Jan 21, 2026
afa0c2f
removed comment
lorenarosati Jan 21, 2026
87fae6a
Merge branch 'main' into table-config-tracing
lorenarosati Jan 21, 2026
5a7a8b3
Merge branch 'main' into table-config-tracing
lorenarosati Jan 21, 2026
a938d4d
moved logwriter to test-utils
lorenarosati Jan 21, 2026
bf5efe5
Merge branch 'main' into table-config-tracing
lorenarosati Jan 22, 2026
7323bf0
added tracing statemnets
lorenarosati Jan 22, 2026
c4867c2
Merge branch 'main' into cdf-tracing
lorenarosati Jan 23, 2026
3b07816
Merge branch 'main' into cdf-tracing
lorenarosati Jan 23, 2026
6251ec3
quick fix w timestamp variable
lorenarosati Jan 23, 2026
0b3a3d4
Merge branch 'cdf-tracing' of github.com:lorenarosati/delta-kernel-rs…
lorenarosati Jan 23, 2026
a0ecd22
added logging test set up
lorenarosati Jan 23, 2026
abd5371
added test for cdc action=true
lorenarosati Jan 23, 2026
4aa9449
added test for deletion vector
lorenarosati Jan 23, 2026
25d8f33
dv test
lorenarosati Jan 23, 2026
652f717
added id to trace
lorenarosati Jan 23, 2026
76536ad
Merge branch 'main' into cdf-tracing
lorenarosati Jan 23, 2026
b99c00d
Merge branch 'main' into cdf-tracing
lorenarosati Jan 23, 2026
80b924d
added default to pass test
lorenarosati Jan 23, 2026
e3f1c7e
quick fix
lorenarosati Jan 23, 2026
800365d
Merge branch 'main' into cdf-tracing
lorenarosati Jan 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions kernel/src/table_changes/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::actions::{
};
use crate::engine_data::{GetData, TypedGetData};
use crate::expressions::{column_name, ColumnName};
use crate::path::ParsedLogPath;
use crate::path::{AsUrl, ParsedLogPath};
use crate::scan::data_skipping::DataSkippingFilter;
use crate::scan::state::DvInfo;
use crate::schema::{
Expand Down Expand Up @@ -253,8 +253,21 @@ impl LogReplayScanner {
// same as an `add` action.
remove_dvs.retain(|rm_path, _| add_paths.contains(rm_path));
}

let timestamp = commit_file.location.last_modified;

info!(
version = commit_file.version,
id = table_configuration.metadata().id(),
remove_dvs_size = remove_dvs.len(),
has_cdc_action = has_cdc_action,
file_path = %commit_file.location.as_url(),
timestamp = timestamp,
"Phase 1 of CDF query processing completed"
);

Ok(LogReplayScanner {
timestamp: commit_file.location.last_modified,
timestamp,
commit_file,
has_cdc_action,
remove_dvs,
Expand Down
198 changes: 184 additions & 14 deletions kernel/src/table_changes/log_replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ use crate::table_features::{ColumnMappingMode, TableFeature};
use crate::utils::test_utils::{assert_result_error_with_message, Action, LocalMockTable};
use crate::Predicate;
use crate::{DeltaResult, Engine, Error, Version};
use test_utils::LogWriter;
use test_utils::LoggingTest;

use itertools::Itertools;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::sync::Mutex;
use tracing_subscriber::layer::SubscriberExt;

fn get_schema() -> StructType {
StructType::new_unchecked([
Expand Down Expand Up @@ -921,16 +919,7 @@ async fn file_meta_timestamp() {

#[tokio::test]
async fn print_table_configuration() {
let logs = Arc::new(Mutex::new(Vec::new()));
let logs_clone = logs.clone();

let subscriber = tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer()
.with_writer(move || LogWriter(logs_clone.clone()))
.with_ansi(false),
);

let _guard = tracing::subscriber::set_default(subscriber);
let tracing_guard = LoggingTest::new();

let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
Expand Down Expand Up @@ -978,7 +967,7 @@ async fn print_table_configuration() {
.unwrap()
.try_collect();

let log_output = String::from_utf8(logs.lock().unwrap().clone()).unwrap();
let log_output = tracing_guard.logs();

assert!(log_output.contains("Table configuration updated during CDF query"));
assert!(log_output.contains("version=0"));
Expand All @@ -992,3 +981,184 @@ async fn print_table_configuration() {
assert!(log_output.contains("\"delta.columnMapping.mode\": \"none\""));
assert!(log_output.contains("\"delta.enableDeletionVectors\": \"true\""));
}

#[tokio::test]
async fn print_table_info_post_phase1() {
let tracing_guard = LoggingTest::new();

let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();
// This specific commit (with these actions) isn't necessary to test the tracing for this test, we just need to have one commit with any actions
mock_table
.commit([
Action::Metadata(
Metadata::try_new(
None,
None,
get_schema(),
vec![],
0,
HashMap::from([
("delta.enableChangeDataFeed".to_string(), "true".to_string()),
(
"delta.enableDeletionVectors".to_string(),
"true".to_string(),
),
("delta.columnMapping.mode".to_string(), "none".to_string()),
]),
)
.unwrap(),
),
Action::Protocol(
Protocol::try_new(
3,
7,
Some([TableFeature::DeletionVectors]),
Some([TableFeature::DeletionVectors, TableFeature::ChangeDataFeed]),
)
.unwrap(),
),
])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);

let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema().into(), None)
.unwrap()
.try_collect();

let log_output = tracing_guard.logs();
Comment on lines +1024 to +1036
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added LoggingTest to reduce some duplication of the logging setup as suggested, but also noticed that the tests are still quite similar in structure. I thought about extracting out basically this logic to another function, but I figure that other tests for tracing might have slightly different structures and this might not be necessary if it'll only be used for these four tests that have been added. Let me know if you think I should add this though!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's not the best. A lot of it is from the poor formatting of mock_table's stuff 😔 Since the tests in that module are generally similar, I think this is fine.


assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains("remove_dvs_size=0"));
assert!(log_output.contains("has_cdc_action=false"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}

#[tokio::test]
async fn print_table_info_post_phase1_has_cdc() {
let tracing_guard = LoggingTest::new();

let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

mock_table
.commit([
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Cdc(Cdc {
path: "fake_path_2".into(),
..Default::default()
}),
])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);

let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema().into(), None)
.unwrap()
.try_collect();

let log_output = tracing_guard.logs();

assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains("remove_dvs_size=0"));
assert!(log_output.contains("has_cdc_action=true"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}

#[tokio::test]
async fn print_table_info_post_phase1_has_dv() {
let tracing_guard = LoggingTest::new();

let engine = Arc::new(SyncEngine::new());
let mut mock_table = LocalMockTable::new();

let deletion_vector1 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
};
let deletion_vector2 = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "U5OWRz5k%CFT.Td}yCPW".to_string(),
offset: Some(1),
size_in_bytes: 38,
cardinality: 3,
};
// - fake_path_1 undergoes a restore. All rows are restored, so the deletion vector is removed.
// - All remaining rows of fake_path_2 are deleted
mock_table
.commit([
Action::Remove(Remove {
path: "fake_path_1".into(),
data_change: true,
deletion_vector: Some(deletion_vector1.clone()),
..Default::default()
}),
Action::Add(Add {
path: "fake_path_1".into(),
data_change: true,
..Default::default()
}),
Action::Remove(Remove {
path: "fake_path_2".into(),
data_change: true,
deletion_vector: Some(deletion_vector2.clone()),
..Default::default()
}),
])
.await;

let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
.unwrap()
.into_iter();

let table_root_url = url::Url::from_directory_path(mock_table.table_root()).unwrap();
let table_config = get_default_table_config(&table_root_url);
let _scan_batches: DeltaResult<Vec<_>> =
table_changes_action_iter(engine, &table_config, commits, get_schema().into(), None)
.unwrap()
.try_collect();

let log_output = tracing_guard.logs();

let expected_remove_dvs: Arc<HashMap<String, DvInfo>> = HashMap::from([(
"fake_path_1".to_string(),
DvInfo {
deletion_vector: Some(deletion_vector1.clone()),
},
)])
.into();

assert!(log_output.contains("Phase 1 of CDF query processing completed"));
assert!(log_output.contains("id="));
assert!(log_output.contains(&format!("remove_dvs_size={}", expected_remove_dvs.len())));
assert!(log_output.contains("has_cdc_action=false"));
assert!(log_output.contains("file_path="));
assert!(log_output.contains("version=0"));
assert!(log_output.contains("timestamp="));
}
6 changes: 6 additions & 0 deletions test-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ tempfile = "3"
url = "2.5.4"
uuid = { version = "1", features = ["v4"] }
zstd = "0.13"
# only for structured logging
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"fmt",
] }
31 changes: 31 additions & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use object_store::memory::InMemory;
use object_store::{path::Path, ObjectStore};
use serde_json::{json, to_vec};
use std::sync::Mutex;
use tracing::subscriber::DefaultGuard;
use tracing_subscriber::layer::SubscriberExt;
use url::Url;

/// unpack the test data from {test_parent_dir}/{test_name}.tar.zst into a temp dir, and return the
Expand Down Expand Up @@ -573,6 +575,7 @@ pub fn create_add_files_metadata(
Ok(Box::new(ArrowEngineData::new(batch)))
}

// Writer that captures log output into a shared buffer for test assertions
pub struct LogWriter(pub Arc<Mutex<Vec<u8>>>);

impl std::io::Write for LogWriter {
Expand All @@ -583,3 +586,31 @@ impl std::io::Write for LogWriter {
self.0.lock().unwrap().flush()
}
}

// Test helper that sets up tracing to capture log output
// The guard keeps the tracing subscriber active for the lifetime of the struct
pub struct LoggingTest {
logs: Arc<Mutex<Vec<u8>>>,
_guard: DefaultGuard,
}

impl Default for LoggingTest {
fn default() -> Self {
Self::new()
}
}

impl LoggingTest {
pub fn new() -> Self {
let logs = Arc::new(Mutex::new(Vec::new()));
let logs_clone = logs.clone();
let _guard = tracing::subscriber::set_default(tracing_subscriber::registry().with(
tracing_subscriber::fmt::layer().with_writer(move || LogWriter(logs_clone.clone())),
));
Self { logs, _guard }
}

pub fn logs(&self) -> String {
String::from_utf8(self.logs.lock().unwrap().clone()).unwrap()
}
}
Loading