Skip to content

Commit ecc22d7

Browse files
high level ops and integration test
1 parent ec9ed8f commit ecc22d7

File tree

9 files changed

+523
-25
lines changed

9 files changed

+523
-25
lines changed

kernel/src/history_manager/mod.rs

Lines changed: 188 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
use error::LogHistoryError;
2-
use error::TimestampOutOfRangeError;
1+
//! This module defines the [`LogHistoryManager`], which can be used to perform timestamp queries
2+
//! over the Delta Log, translating from timestamps to Delta versions.
3+
4+
use error::{LogHistoryError, TimestampOutOfRangeError};
35
use search::{binary_search_by_key_with_bounds, Bound, SearchError};
6+
use std::cell::RefCell;
47
use std::cmp::Ordering;
8+
use std::collections::HashMap;
59
use std::fmt::Debug;
610
use std::sync::Arc;
11+
use url::Url;
712

813
use crate::log_segment::LogSegment;
914
use crate::path::ParsedLogPath;
@@ -33,8 +38,8 @@ type Timestamp = i64;
3338
///
3439
/// Use this manager to:
3540
/// - Convert timestamps or timestamp ranges into Delta versions or version ranges
36-
/// - Perform time travel queries using [`Table::snapshot`]
37-
/// - Execute timestamp-based change data feed queries using [`Table::table_changes`]
41+
/// - Perform time travel queries using `Table::snapshot`
42+
/// - Execute timestamp-based change data feed queries using `Table::table_changes`
3843
///
3944
/// The [`LogHistoryManager`] works with tables regardless of whether they have In-Commit
4045
/// Timestamps enabled.
@@ -44,14 +49,13 @@ type Timestamp = i64;
4449
/// Once created, the [`LogHistoryManager`] does not automatically update with newer versions
4550
/// of the table. All timestamp queries are limited to the state captured in the [`Snapshot`]
4651
/// provided during construction.
47-
#[allow(unused)]
4852
#[derive(Debug)]
49-
pub(crate) struct LogHistoryManager {
53+
pub struct LogHistoryManager {
5054
log_segment: LogSegment,
5155
snapshot: Arc<Snapshot>,
56+
commit_to_timestamp_cache: RefCell<HashMap<Url, Timestamp>>,
5257
}
5358

54-
#[allow(unused)]
5559
#[derive(Debug)]
5660
enum TimestampSearchBounds {
5761
ExactMatch(Version),
@@ -97,25 +101,41 @@ impl LogHistoryManager {
97101
Ok(Self {
98102
log_segment,
99103
snapshot,
104+
commit_to_timestamp_cache: Default::default(),
100105
})
101106
}
107+
fn update_cache_with_timestamp(&self, commit_file: &ParsedLogPath, value: Timestamp) {
108+
self.commit_to_timestamp_cache
109+
.borrow_mut()
110+
.insert(commit_file.location.location.clone(), value);
111+
}
112+
fn get_cached_timestamp(&self, commit_file: &ParsedLogPath) -> Option<Timestamp> {
113+
self.commit_to_timestamp_cache
114+
.borrow()
115+
.get(&commit_file.location.location)
116+
.copied()
117+
}
102118

103119
/// Gets the timestamp for the `commit_file`. If `read_ict` is false, this returns the file's
104120
/// modification timestamp. If `read_ict` is true, this reads the file's In-commit timestamp.
105-
#[allow(unused)]
106121
fn commit_file_to_timestamp(
107122
&self,
108123
engine: &dyn Engine,
109124
commit_file: &ParsedLogPath,
110125
read_ict: bool,
111126
) -> Result<Timestamp, LogHistoryError> {
127+
if let Some(cached) = self.get_cached_timestamp(commit_file) {
128+
return Ok(cached);
129+
}
112130
let commit_timestamp = if read_ict {
113131
Self::read_in_commit_timestamp(engine, commit_file)?
114132
} else {
115133
// By default, the timestamp of a commit is its modification time
116134
commit_file.location.last_modified
117135
};
118136

137+
self.update_cache_with_timestamp(commit_file, commit_timestamp);
138+
119139
Ok(commit_timestamp)
120140
}
121141

@@ -127,7 +147,6 @@ impl LogHistoryManager {
127147
/// This returns a [`LogHistoryError::InCommitTimestampNotFoundError`] if the in-commit timestamp
128148
/// is not present in the commit file, or if the CommitInfo is not the first action in the
129149
/// commit.
130-
#[allow(unused)]
131150
fn read_in_commit_timestamp(
132151
engine: &dyn Engine,
133152
commit_file: &ParsedLogPath,
@@ -165,11 +184,170 @@ impl LogHistoryManager {
165184
visitor.in_commit_timestamp.ok_or_else(not_found)
166185
}
167186

187+
/// Gets the latest version that occurs before or at the given `timestamp`.
188+
///
189+
/// This finds the version whose timestamp is less than or equal to `timestamp`.
190+
/// If no such version exists, returns [`LogHistoryError::TimestampOutOfRange`].
191+
///
192+
////// # Examples
193+
/// ```rust
194+
/// # use delta_kernel::history_manager::error::LogHistoryError;
195+
/// # use delta_kernel::engine::sync::SyncEngine;
196+
/// # use delta_kernel::Table;
197+
/// # use std::sync::Arc;
198+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
199+
/// # let engine = Arc::new(SyncEngine::new());
200+
/// let table = Table::try_from_uri(path)?;
201+
/// let manager = table.history_manager(engine.as_ref(), None)?;
202+
///
203+
/// // Get the latest version as of January 1, 2023
204+
/// let timestamp = 1672531200000; // Milliseconds since epoch for 2023-01-01
205+
/// let version_res = manager.latest_version_as_of(engine.as_ref(), timestamp);
206+
/// # Ok::<(), delta_kernel::Error>(())
207+
/// ```
208+
pub fn latest_version_as_of(
209+
&self,
210+
engine: &dyn Engine,
211+
timestamp: Timestamp,
212+
) -> DeltaResult<Version> {
213+
Ok(self.timestamp_to_version(engine, timestamp, Bound::GreatestLower)?)
214+
}
215+
216+
/// Gets the first version that occurs after the given `timestamp` (inclusive).
217+
///
218+
/// This finds the version whose timestamp is greater than or equal to `timestamp`.
219+
/// If no such version exists, returns [`LogHistoryError::TimestampOutOfRange`].
220+
/// # Examples
221+
/// ```rust
222+
/// # use delta_kernel::engine::sync::SyncEngine;
223+
/// # use delta_kernel::Table;
224+
/// # use std::sync::Arc;
225+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
226+
/// # let engine = Arc::new(SyncEngine::new());
227+
/// let table = Table::try_from_uri(path)?;
228+
/// let manager = table.history_manager(engine.as_ref(), None)?;
229+
///
230+
/// // Find the first version that occurred after January 1, 2023
231+
/// let timestamp = 1672531200000; // Milliseconds since epoch for 2023-01-01
232+
/// let version_res = manager.first_version_after(engine.as_ref(), timestamp);
233+
/// # Ok::<(), delta_kernel::Error>(())
234+
/// ```
235+
pub fn first_version_after(
236+
&self,
237+
engine: &dyn Engine,
238+
timestamp: Timestamp,
239+
) -> DeltaResult<Version> {
240+
Ok(self.timestamp_to_version(engine, timestamp, Bound::LeastUpper)?)
241+
}
242+
243+
/// Converts a timestamp range to a corresponding version range.
244+
///
245+
/// This function finds the version range that corresponds to the given timestamp range.
246+
/// The returned tuple contains:
247+
/// - The first (earliest) version with a timestamp greater than or equal to `start_timestamp`
248+
/// - If `end_timestamp` is provided, the version with a timestamp less than or equal to `end_timestamp`.
249+
///
250+
/// # Arguments
251+
/// * `engine` - The engine used to access version history
252+
/// * `start_timestamp` - The lower bound timestamp (inclusive)
253+
/// * `end_timestamp` - The optional upper bound timestamp (inclusive), or `None` to indicate no upper bound
254+
///
255+
/// # Returns
256+
/// A tuple containing the start version and optional end version (inclusive)
257+
///
258+
/// # Errors
259+
/// Returns [`LogHistoryError::TimestampOutOfRange`] if:
260+
/// - No version exists at or after `start_timestamp`
261+
/// - `end_timestamp` is provided and no version exists at or before it
262+
///
263+
/// Returns [`LogHistoryError::InvalidTimestampRange`] if the entire range [start_timestamp,
264+
/// end_timestamp]
265+
///
266+
/// # Examples
267+
/// ```rust
268+
/// # use delta_kernel::engine::sync::SyncEngine;
269+
/// # use delta_kernel::Table;
270+
/// # use std::sync::Arc;
271+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
272+
/// # let engine = Arc::new(SyncEngine::new());
273+
///
274+
/// let table = Table::try_from_uri(path)?;
275+
/// let manager = table.history_manager(engine.as_ref(), None)?;
276+
///
277+
/// // Find versions between January 1, 2023 and March 1, 2023
278+
/// let start_timestamp = 1672531200000; // Jan 1, 2023 (milliseconds since epoch)
279+
/// let end_timestamp = 1677628800000; // Mar 1, 2023 (milliseconds since epoch)
280+
///
281+
/// let version_range_res =
282+
/// manager.timestamp_range_to_versions(engine.as_ref(), start_timestamp, end_timestamp);
283+
/// # Ok::<(), delta_kernel::Error>(())
284+
/// ```
285+
pub fn timestamp_range_to_versions(
286+
&self,
287+
engine: &dyn Engine,
288+
start_timestamp: Timestamp,
289+
end_timestamp: impl Into<Option<Timestamp>>,
290+
) -> DeltaResult<(Version, Option<Version>)> {
291+
// Check that the start and end timestamps are valid. Timestamps must be positive
292+
let end_timestamp = end_timestamp.into();
293+
require!(
294+
0 <= start_timestamp,
295+
LogHistoryError::InvalidTimestamp(start_timestamp).into()
296+
);
297+
if let Some(end_timestamp) = end_timestamp {
298+
require!(
299+
0 <= end_timestamp,
300+
LogHistoryError::InvalidTimestamp(end_timestamp).into()
301+
);
302+
// The `start_timestamp` must be no greater than the `end_timestamp`.
303+
require!(
304+
start_timestamp <= end_timestamp,
305+
LogHistoryError::InvalidTimestampRange {
306+
start_timestamp,
307+
end_timestamp
308+
}
309+
.into()
310+
);
311+
}
312+
313+
// Convert the start timestamp to version
314+
let start_version = self.first_version_after(engine, start_timestamp)?;
315+
316+
// If the end timestamp is present, convert it to an end version
317+
let end_version = end_timestamp
318+
.map(|end| {
319+
let end_version = self.latest_version_as_of(engine, end)?;
320+
321+
// Verify that the start version is no greater than the end version. This can
322+
// happen in the case that the entire timestamp range falls between two commits.
323+
// Consider the following history:
324+
// |-------------|--------------------|---------------|
325+
// v4 start_timestamp end_timestamp v5
326+
//
327+
// The latest version as of the end_timestamp is 4. The first version after the
328+
// start_timestamp is 5. Thus in the case where end_version < start_version, we
329+
// return and [`LogHistoryError::EmptyTimestampRange`].
330+
require!(
331+
start_version <= end_version,
332+
DeltaError::from(LogHistoryError::EmptyTimestampRange {
333+
end_timestamp: end,
334+
start_timestamp,
335+
between_left: end_version,
336+
between_right: start_version
337+
})
338+
);
339+
340+
Ok(end_version)
341+
})
342+
.transpose()?;
343+
344+
Ok((start_version, end_version))
345+
}
346+
168347
/// Given a timestamp, this function determines the commit range that timestamp conversion
169348
/// should search. A timestamp search may be conducted over one of two version ranges:
170349
/// 1) A range of commits whose timestamp is the file modification timestamp
171350
/// 2) A range of commits whose timestamp is an in-commit timestamp.
172-
#[allow(unused)]
173351
fn get_timestamp_search_bounds(
174352
&self,
175353
timestamp: Timestamp,
@@ -237,7 +415,6 @@ impl LogHistoryManager {
237415
/// happen in the following cases based on the bound:
238416
/// - `Bound::GreatestLower`: There is no commit whose timestamp is lower than the given `timestamp`.
239417
/// - `Bound::LeastUpper`: There is no commit whose timestamp is greater than the given `timestamp`.
240-
#[allow(unused)]
241418
fn timestamp_to_version(
242419
&self,
243420
engine: &dyn Engine,

kernel/src/history_manager/search.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@ use delta_kernel_derive::internal_api;
1515
/// * [`Bound::GreatestLower`] - Finds the largest index `i` such that `values[i] <= key`.
1616
/// This represents the last element less than or equal to the search key.
1717
#[internal_api]
18-
#[allow(unused)]
1918
#[derive(Debug, Clone, Copy, PartialEq)]
2019
pub(crate) enum Bound {
2120
LeastUpper,
2221
GreatestLower,
2322
}
2423

2524
#[internal_api]
26-
#[allow(unused)]
2725
#[derive(Debug)]
2826
pub(crate) enum SearchError<T: Error> {
2927
OutOfRange,
@@ -119,7 +117,6 @@ pub(crate) enum SearchError<T: Error> {
119117
/// assert!(matches!(result, Err(SearchError::KeyFunctionError(_))));
120118
/// ```
121119
#[internal_api]
122-
#[allow(unused)]
123120
pub(crate) fn binary_search_by_key_with_bounds<'a, T, K: Ord + Debug, E: Error>(
124121
values: &'a [T],
125122
key: K,

kernel/src/history_manager/timestamp_visitor.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ use crate::{DeltaResult, Error, RowVisitor};
1414
///
1515
/// Only one row of the engine data is checked because CommitInfo. This is because in-commit
1616
/// timestamps requires that the CommitInfo containing the ICT be the first action in the log.
17-
#[allow(unused)]
1817
#[derive(Default)]
1918
pub(crate) struct InCommitTimestampVisitor {
2019
pub(crate) in_commit_timestamp: Option<i64>,
2120
}
2221

2322
impl InCommitTimestampVisitor {
24-
#[allow(unused)]
2523
/// Get the schema that the visitor expects the data to have.
2624
pub(crate) fn schema() -> Arc<Schema> {
2725
static SCHEMA: LazyLock<Arc<Schema>> = LazyLock::new(|| {

kernel/src/lib.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub mod checkpoint;
7878
pub mod engine_data;
7979
pub mod error;
8080
pub mod expressions;
81+
pub mod history_manager;
8182
pub mod scan;
8283
pub mod schema;
8384
pub mod snapshot;
@@ -114,11 +115,6 @@ pub mod log_segment;
114115
#[cfg(not(feature = "internal-api"))]
115116
pub(crate) mod log_segment;
116117

117-
#[cfg(feature = "internal-api")]
118-
pub mod history_manager;
119-
#[cfg(not(feature = "internal-api"))]
120-
pub(crate) mod history_manager;
121-
122118
pub use delta_kernel_derive;
123119
pub use engine_data::{EngineData, RowVisitor};
124120
pub use error::{DeltaResult, Error};

kernel/src/log_segment.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ impl LogSegment {
191191
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
192192
}
193193

194-
#[allow(unused)]
195194
/// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will consist
196195
/// only of contiguous commit files. If an `end_version` is specified, the commit range will
197196
/// include commits up to the `end_version` commit (inclusive). If present, `limit` specifies the

kernel/src/table.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::sync::Arc;
99
use url::Url;
1010

1111
use crate::checkpoint::CheckpointWriter;
12+
use crate::history_manager::LogHistoryManager;
1213
use crate::snapshot::Snapshot;
1314
use crate::table_changes::TableChanges;
1415
use crate::transaction::Transaction;
@@ -76,6 +77,46 @@ impl Table {
7677
&self.location
7778
}
7879

80+
/// Creates a [`LogHistoryManager`] from an existing [`Snapshot`].
81+
///
82+
/// This method allows reusing an existing snapshot to create a history manager
83+
/// that can resolve timestamps for commits up to and including `snapshot.version()`.
84+
///
85+
/// # Parameters
86+
/// - `engine`: The [`Engine`] implementation to use for data access
87+
/// - `snapshot`: An existing snapshot to initialize the history manager with
88+
/// - `limit`: Optional maximum number of versions to track. When specified, the earliest
89+
/// queryable version will be `snapshot.version() - limit`. This allows trading
90+
/// memory usage for historical reach.
91+
pub fn history_manager_from_snapshot(
92+
&self,
93+
engine: &dyn Engine,
94+
snapshot: Arc<Snapshot>,
95+
limit: Option<usize>,
96+
) -> DeltaResult<LogHistoryManager> {
97+
LogHistoryManager::try_new(engine, snapshot, limit)
98+
}
99+
100+
/// Creates a [`LogHistoryManager`] for the latest table version.
101+
///
102+
/// This method creates a new snapshot at the latest table version
103+
/// and initializes a history manager that can resolve timestamps for commits
104+
/// up to and including the current latest version.
105+
///
106+
/// # Parameters
107+
/// - `engine`: The [`Engine`] implementation to use for data access
108+
/// - `limit`: Optional maximum number of versions to track. When specified, the earliest
109+
/// queryable version will be `latest_version - limit`. This allows trading
110+
/// memory usage for historical reach.
111+
pub fn history_manager(
112+
&self,
113+
engine: &dyn Engine,
114+
limit: Option<usize>,
115+
) -> DeltaResult<LogHistoryManager> {
116+
let snapshot = Arc::new(self.snapshot(engine, None)?);
117+
LogHistoryManager::try_new(engine, snapshot, limit)
118+
}
119+
79120
/// Create a [`Snapshot`] of the table corresponding to `version`.
80121
///
81122
/// If no version is supplied, a snapshot for the latest version will be created.

0 commit comments

Comments
 (0)