-
Notifications
You must be signed in to change notification settings - Fork 139
feat: add CheckpointLogReplayProcessor in new checkpoints mod
#744
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 120 commits
435302e
e500a10
19733cd
87c9f31
db5ccd0
42c08c1
f91baeb
9fdfba7
d420fd1
9e0e048
303444b
5dbc924
b793961
508976f
bccaa17
a23d7cb
0160ef1
aae7046
f574370
a618833
7da74b2
220a216
9d86911
e5b0e32
a5393dc
a23c651
d712d18
e564ae1
b14ff19
a52d484
a243a98
9f06382
58f38c0
628546c
88cf983
10bb7b5
4b5a3a9
1cb9364
797a05c
45c698d
b062125
90e46cd
3c25392
cba8ed6
28f1fb4
48f831a
7c3d976
b2bb0ce
0054c71
abc7e1f
964f294
c026258
88ba96c
4c98c84
c7cd2d1
542166c
655ed1d
6c222a3
30bd7d6
159b0dd
5777e5a
5e6695b
bdbc3fb
6491113
95d0164
51104aa
7a59eab
e4bc34e
d24a80c
1981ab4
f084424
6a28d99
3488318
c4e5522
18d1a29
92b7296
6167cf2
0d8b3c0
43760a5
1137be6
6e3d722
2252cec
09f3930
3efeef6
63f0294
fab97ba
f79d9a5
568b59e
bce9384
87b17d4
d8df2ea
7f49ccd
e809306
c9f6edd
3f8a69b
e520d1f
f31e51d
79d6ff8
a3cf0f2
4416968
20fe7fe
29489d7
ae22a6b
5ccde93
00c834b
3c11320
4f61757
fdd4f68
f3257b4
9c992fc
2aec9c3
2e2062f
c92ea56
fcb289d
4d2029e
e0d81ab
e9de5bc
0b609d5
ab0a373
9a9697a
c7630a3
48a0153
4a1a1dd
4d48a8a
411b2c4
48d529d
6a672d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,16 +26,150 @@ | |
| //! 1. Creates a visitor with the current deduplication state | ||
| //! 2. Applies the visitor to filter actions in the batch | ||
| //! 3. Updates counters and state for cross-batch deduplication | ||
| //! 4. Produces a [`CheckpointData`] result which includes a selection vector indicating which | ||
| //! 4. Produces a [`FilteredEngineData`] result which includes a selection vector indicating which | ||
| //! actions should be included in the checkpoint file | ||
| use std::collections::HashSet; | ||
| use std::sync::LazyLock; | ||
|
|
||
| use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; | ||
| use crate::log_replay::{FileActionDeduplicator, FileActionKey}; | ||
| use crate::engine_data::{FilteredEngineData, GetData, RowVisitor, TypedGetData as _}; | ||
| use crate::log_replay::{FileActionDeduplicator, FileActionKey, LogReplayProcessor}; | ||
| use crate::scan::data_skipping::DataSkippingFilter; | ||
| use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType}; | ||
| use crate::utils::require; | ||
| use crate::{DeltaResult, Error}; | ||
| use crate::{DeltaResult, EngineData, Error}; | ||
| use std::collections::HashSet; | ||
| use std::sync::atomic::{AtomicI64, Ordering}; | ||
| use std::sync::{Arc, LazyLock}; | ||
|
|
||
| /// The [`CheckpointLogReplayProcessor`] is an implementation of the [`LogReplayProcessor`] | ||
| /// trait that filters log segment actions for inclusion in a V1 spec checkpoint file. | ||
| /// | ||
| /// It processes each action batch via the `process_actions_batch` method, using the | ||
| /// [`CheckpointVisitor`] to build an accompanying selection vector indicating which actions | ||
| /// should be included in the checkpoint. | ||
| pub(crate) struct CheckpointLogReplayProcessor { | ||
| /// Tracks file actions that have been seen during log replay to avoid duplicates. | ||
| /// Contains (data file path, dv_unique_id) pairs as `FileActionKey` instances. | ||
| seen_file_keys: HashSet<FileActionKey>, | ||
| // Arc<AtomicI64> provides shared mutability for our counters, allowing both the | ||
| // iterator to update the values during processing and the caller to observe the final | ||
| // counts afterward. The counters are i64 to match the `_last_checkpoint` file schema. | ||
| // Tracks the total number of actions included in the checkpoint file. | ||
| total_actions: Arc<AtomicI64>, | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // Tracks the total number of add actions included in the checkpoint file. | ||
| add_actions_count: Arc<AtomicI64>, | ||
| /// Indicates whether a protocol action has been seen in the log. | ||
| seen_protocol: bool, | ||
| /// Indicates whether a metadata action has been seen in the log. | ||
| seen_metadata: bool, | ||
| /// Set of transaction app IDs that have been processed to avoid duplicates. | ||
| seen_txns: HashSet<String>, | ||
| /// Minimum timestamp for file retention, used for filtering expired tombstones. | ||
| minimum_file_retention_timestamp: i64, | ||
| } | ||
|
|
||
| impl LogReplayProcessor for CheckpointLogReplayProcessor { | ||
| type Output = FilteredEngineData; | ||
|
|
||
| /// This function is applied to each batch of actions read from the log during | ||
| /// log replay in reverse chronological order (from most recent to least recent), | ||
| /// and performs the necessary filtering and deduplication to produce the minimal | ||
| /// set of actions to be written to the checkpoint file. | ||
| /// | ||
| /// # Filtering Rules | ||
| /// | ||
| /// 1. Only the most recent protocol and metadata actions are included | ||
| /// 2. For each app ID, only the most recent transaction action is included | ||
| /// 3. Add and remove actions are deduplicated based on path and unique ID | ||
| /// 4. Remove tombstones older than `minimum_file_retention_timestamp` are excluded | ||
| /// 5. Sidecar, commitInfo, and CDC actions are excluded | ||
| fn process_actions_batch( | ||
| &mut self, | ||
| batch: Box<dyn EngineData>, | ||
| is_log_batch: bool, | ||
| ) -> DeltaResult<Self::Output> { | ||
| let selection_vector = vec![true; batch.len()]; | ||
| assert_eq!( | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| selection_vector.len(), | ||
| batch.len(), | ||
| "Initial selection vector length does not match actions length" | ||
| ); | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // Create the checkpoint visitor to process actions and update selection vector | ||
| let mut visitor = CheckpointVisitor::new( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related to my comment in the other PR: does it work if we keep a checkpoint visitor within the
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed in other thread #738 (comment) |
||
| &mut self.seen_file_keys, | ||
| is_log_batch, | ||
| selection_vector, | ||
| self.minimum_file_retention_timestamp, | ||
| self.seen_protocol, | ||
| self.seen_metadata, | ||
| &mut self.seen_txns, | ||
| ); | ||
| visitor.visit_rows_of(batch.as_ref())?; | ||
|
|
||
| // Update the total actions and add actions counters. Relaxed ordering is sufficient | ||
| // here as we only care about the total count when writing the _last_checkpoint file. | ||
| // (the ordering is not important for correctness) | ||
| self.total_actions.fetch_add( | ||
| visitor.file_actions_count + visitor.non_file_actions_count, | ||
| Ordering::Relaxed, | ||
| ); | ||
| self.add_actions_count | ||
| .fetch_add(visitor.add_actions_count, Ordering::Relaxed); | ||
|
|
||
| // Update protocol and metadata seen flags | ||
| self.seen_protocol = visitor.seen_protocol; | ||
| self.seen_metadata = visitor.seen_metadata; | ||
|
|
||
| Ok(FilteredEngineData { | ||
| data: batch, | ||
| selection_vector: visitor.selection_vector, | ||
| }) | ||
| } | ||
|
|
||
| /// Data skipping is not applicable for checkpoint log replay. | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| fn data_skipping_filter(&self) -> Option<&DataSkippingFilter> { | ||
| None | ||
| } | ||
| } | ||
|
|
||
| impl CheckpointLogReplayProcessor { | ||
| pub(crate) fn new( | ||
| total_actions: Arc<AtomicI64>, | ||
| add_actions_count: Arc<AtomicI64>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> Self { | ||
| Self { | ||
| seen_file_keys: Default::default(), | ||
| total_actions, | ||
| add_actions_count, | ||
| seen_protocol: false, | ||
| seen_metadata: false, | ||
| seen_txns: Default::default(), | ||
| minimum_file_retention_timestamp, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Given an iterator of (engine_data, bool) tuples, returns an iterator of | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// `(engine_data, selection_vec)`. Each row that is selected in the returned `engine_data` _must_ | ||
| /// be written to the V1 checkpoint file in order to capture the table version's complete state. | ||
| /// Non-selected rows _must_ be ignored. The boolean flag tied to each actions batch indicates | ||
| /// whether the batch is a commit batch (true) or a checkpoint batch (false). | ||
| /// | ||
| /// Note: The 'action_iter' parameter is an iterator of (engine_data, bool) tuples that _must_ be | ||
| /// sorted by the order of the actions in the log from most recent to least recent. | ||
| #[allow(unused)] // TODO: Remove once API is implemented | ||
| pub(crate) fn checkpoint_actions_iter( | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| action_iter: impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>>, | ||
| total_actions: Arc<AtomicI64>, | ||
| add_actions_count: Arc<AtomicI64>, | ||
| minimum_file_retention_timestamp: i64, | ||
| ) -> impl Iterator<Item = DeltaResult<FilteredEngineData>> { | ||
| CheckpointLogReplayProcessor::new( | ||
| total_actions, | ||
| add_actions_count, | ||
| minimum_file_retention_timestamp, | ||
| ) | ||
| .process_actions_iter(action_iter) | ||
| } | ||
sebastiantia marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| /// A visitor that filters actions for inclusion in a V1 spec checkpoint file. | ||
| /// | ||
|
|
@@ -137,13 +271,13 @@ impl CheckpointVisitor<'_> { | |
| Self::REMOVE_DV_START_INDEX, | ||
| ), | ||
| selection_vector, | ||
| non_file_actions_count: 0, | ||
| file_actions_count: 0, | ||
| add_actions_count: 0, | ||
| minimum_file_retention_timestamp, | ||
| seen_protocol, | ||
| seen_metadata, | ||
| seen_txns, | ||
| non_file_actions_count: 0, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -355,12 +489,38 @@ impl RowVisitor for CheckpointVisitor<'_> { | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::collections::HashSet; | ||
|
|
||
| use super::*; | ||
| use crate::arrow::array::StringArray; | ||
| use crate::utils::test_utils::{action_batch, parse_json_batch}; | ||
| use itertools::Itertools; | ||
| use std::collections::HashSet; | ||
|
|
||
| use super::*; | ||
| /// Helper function to create test batches from JSON strings | ||
| fn create_batch(json_strings: Vec<&str>) -> DeltaResult<(Box<dyn EngineData>, bool)> { | ||
| Ok((parse_json_batch(StringArray::from(json_strings)), true)) | ||
| } | ||
|
|
||
| /// Helper function which applies the `checkpoint_actions_iter` function to a set of | ||
| /// input batches and returns the results. | ||
| fn run_checkpoint_test( | ||
| input_batches: Vec<(Box<dyn EngineData>, bool)>, | ||
| ) -> DeltaResult<(Vec<FilteredEngineData>, i64, i64)> { | ||
| let total_actions = Arc::new(AtomicI64::new(0)); | ||
| let add_actions_count = Arc::new(AtomicI64::new(0)); | ||
| let results: Vec<_> = checkpoint_actions_iter( | ||
| input_batches.into_iter().map(Ok), | ||
| total_actions.clone(), | ||
| add_actions_count.clone(), | ||
| 0, | ||
| ) | ||
| .try_collect()?; | ||
|
|
||
| Ok(( | ||
| results, | ||
| total_actions.load(Ordering::Relaxed), | ||
| add_actions_count.load(Ordering::Relaxed), | ||
| )) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_checkpoint_visitor() -> DeltaResult<()> { | ||
|
|
@@ -477,14 +637,14 @@ mod tests { | |
| } | ||
|
|
||
| #[test] | ||
| fn test_checkpoint_visitor_conflicts_with_deletion_vectors() -> DeltaResult<()> { | ||
| fn test_checkpoint_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> { | ||
| let json_strings: StringArray = vec![ | ||
| // Add action for file1 with deletion vector | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"two","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // Remove action for file1 with a different deletion vector | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // Add action for file1 with the same deletion vector as the remove action above (excluded) | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"one","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // Remove action for file1 with a different deletion vector | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // Remove action for file1 with another different deletion vector | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"THREE","pathOrInlineDv":"dv3","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated this single-batch from a single-commit file test case as it's not a valid scenario:
Instead we test repeating primary keys spanning across multiple commit files below in |
||
| ] | ||
| .into(); | ||
| let batch = parse_json_batch(json_strings); | ||
|
|
@@ -503,9 +663,9 @@ mod tests { | |
|
|
||
| visitor.visit_rows_of(batch.as_ref())?; | ||
|
|
||
| let expected = vec![true, true, false]; | ||
| let expected = vec![true, true, true]; | ||
| assert_eq!(visitor.selection_vector, expected); | ||
| assert_eq!(visitor.file_actions_count, 2); | ||
| assert_eq!(visitor.file_actions_count, 3); | ||
| assert_eq!(visitor.add_actions_count, 1); | ||
| assert_eq!(visitor.non_file_actions_count, 0); | ||
|
|
||
|
|
@@ -585,4 +745,119 @@ mod tests { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// This test ensures that the processor correctly deduplicates and filters | ||
| /// non-file actions (metadata, protocol, txn) across multiple batches. | ||
| #[test] | ||
| fn test_checkpoint_actions_iter_non_file_actions() -> DeltaResult<()> { | ||
| // Batch 1: protocol, metadata, and txn actions | ||
| let batch1 = vec![ | ||
| r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, | ||
| r#"{"metaData":{"id":"test1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, | ||
| ]; | ||
|
|
||
| // Batch 2: duplicate actions, and a new txn action | ||
| let batch2 = vec![ | ||
| // Duplicates that should be skipped | ||
| r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#, | ||
| r#"{"metaData":{"id":"test2","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, | ||
| r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we check the latest is returned by making this version different than above?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right below this line is another txn action with a unique appId which covers the scenario you mentioned |
||
| // Unique transaction (appId) should be included | ||
| r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#, | ||
| ]; | ||
|
|
||
| // Batch 3: a duplicate action (entire batch should be skipped) | ||
| let batch3 = vec![r#"{"protocol":{"minReaderVersion":2,"minWriterVersion":3}}"#]; | ||
|
|
||
| let input_batches = vec![ | ||
| create_batch(batch1)?, | ||
| create_batch(batch2)?, | ||
| create_batch(batch3)?, | ||
| ]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2, "Expected two batches in results"); | ||
| assert_eq!(results[0].selection_vector, vec![true, true, true],); | ||
| assert_eq!(results[1].selection_vector, vec![false, false, false, true],); | ||
| assert_eq!(total_actions, 4); | ||
| assert_eq!(add_actions, 0); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// This test ensures that the processor correctly deduplicates and filters | ||
| /// file actions (add, remove) across multiple batches. | ||
| #[test] | ||
| fn test_checkpoint_actions_iter_file_actions() -> DeltaResult<()> { | ||
| // Batch 1: add action (file1) - new, should be included | ||
| let batch1 = vec![ | ||
| r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#, | ||
| ]; | ||
|
|
||
| // Batch 2: remove actions - mixed inclusion | ||
| let batch2 = vec![ | ||
| // Already seen file, should be excluded | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, | ||
| // New file, should be included | ||
| r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, | ||
| ]; | ||
|
|
||
| // Batch 3: add action (file2) - already seen, should be excluded | ||
| let batch3 = vec![ | ||
| r#"{"add":{"path":"file2","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#, | ||
| ]; | ||
|
|
||
| let input_batches = vec![ | ||
| create_batch(batch1)?, | ||
| create_batch(batch2)?, | ||
| create_batch(batch3)?, | ||
| ]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2); // The third batch should be filtered out since there are no selected actions | ||
| assert_eq!(results[0].selection_vector, vec![true]); | ||
| assert_eq!(results[1].selection_vector, vec![false, true]); | ||
| assert_eq!(total_actions, 2); | ||
| assert_eq!(add_actions, 1); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// This test ensures that the processor correctly deduplicates and filters | ||
| /// file actions (add, remove) with deletion vectors across multiple batches. | ||
| #[test] | ||
| fn test_checkpoint_actions_iter_file_actions_with_deletion_vectors() -> DeltaResult<()> { | ||
| // Batch 1: add actions with deletion vectors | ||
| let batch1 = vec![ | ||
| // (file1, DV_ONE) New, should be included | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // (file1, DV_TWO) New, should be included | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| ]; | ||
|
|
||
| // Batch 2: mixed actions with duplicate and new entries | ||
| let batch2 = vec![ | ||
| // (file1, DV_ONE): Already seen, should be excluded | ||
| r#"{"remove":{"path":"file1","deletionTimestamp":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // (file1, DV_TWO): Already seen, should be excluded | ||
| r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"TWO","pathOrInlineDv":"dv2","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, | ||
| // New file, should be included | ||
| r#"{"remove":{"path":"file2","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, | ||
| ]; | ||
|
|
||
| let input_batches = vec![create_batch(batch1)?, create_batch(batch2)?]; | ||
| let (results, total_actions, add_actions) = run_checkpoint_test(input_batches)?; | ||
|
|
||
| // Verify results | ||
| assert_eq!(results.len(), 2); | ||
| assert_eq!(results[0].selection_vector, vec![true, true]); | ||
| assert_eq!(results[1].selection_vector, vec![false, false, true]); | ||
| assert_eq!(total_actions, 3); | ||
| assert_eq!(add_actions, 2); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are tests exhaustive? let's not block this PR but perhaps a follow up if we need to cover:
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think both of your mentioned scenarios are sufficiently tested in the existing visitor level unit tests:
I'm not sure it's worthwhile to add additional tests specifically for cross-batch behavior, but happy to revisit if you think there's value in doing so, let me know! |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.