Skip to content

Commit 2ffb953

Browse files
high level ops and integration test
1 parent f3b7f67 commit 2ffb953

File tree

9 files changed

+527
-26
lines changed

9 files changed

+527
-26
lines changed

kernel/src/history_manager/mod.rs

Lines changed: 191 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1-
use crate::internal_mod;
1+
//! This module defines the [`LogHistoryManager`], which can be used to perform timestamp queries
2+
//! over the Delta Log, translating from timestamps to Delta versions.
23
3-
use error::LogHistoryError;
4-
use error::TimestampOutOfRangeError;
4+
use crate::internal_mod;
5+
use error::{LogHistoryError, TimestampOutOfRangeError};
56
use search::{binary_search_by_key_with_bounds, Bound, SearchError};
7+
use std::cell::RefCell;
68
use std::cmp::Ordering;
9+
use std::collections::HashMap;
710
use std::fmt::Debug;
811
use std::sync::Arc;
12+
use url::Url;
913

1014
use crate::log_segment::LogSegment;
1115
use crate::path::ParsedLogPath;
@@ -14,8 +18,8 @@ use crate::utils::require;
1418
use crate::{DeltaResult, Engine, Error as DeltaError, RowVisitor, Version};
1519
use timestamp_visitor::InCommitTimestampVisitor;
1620

17-
pub(crate) mod error;
1821
internal_mod!(pub(crate) mod search);
22+
pub mod error;
1923
mod timestamp_visitor;
2024

2125
type Timestamp = i64;
@@ -27,8 +31,8 @@ type Timestamp = i64;
2731
///
2832
/// Use this manager to:
2933
/// - Convert timestamps or timestamp ranges into Delta versions or version ranges
30-
/// - Perform time travel queries using [`Table::snapshot`]
31-
/// - Execute timestamp-based change data feed queries using [`Table::table_changes`]
34+
/// - Perform time travel queries using `Table::snapshot`
35+
/// - Execute timestamp-based change data feed queries using `Table::table_changes`
3236
///
3337
/// The [`LogHistoryManager`] works with tables regardless of whether they have In-Commit
3438
/// Timestamps enabled.
@@ -38,14 +42,13 @@ type Timestamp = i64;
3842
/// Once created, the [`LogHistoryManager`] does not automatically update with newer versions
3943
/// of the table. All timestamp queries are limited to the state captured in the [`Snapshot`]
4044
/// provided during construction.
41-
#[allow(unused)]
4245
#[derive(Debug)]
43-
pub(crate) struct LogHistoryManager {
46+
pub struct LogHistoryManager {
4447
log_segment: LogSegment,
4548
snapshot: Arc<Snapshot>,
49+
commit_to_timestamp_cache: RefCell<HashMap<Url, Timestamp>>,
4650
}
4751

48-
#[allow(unused)]
4952
#[derive(Debug)]
5053
enum TimestampSearchBounds {
5154
ExactMatch(Version),
@@ -92,25 +95,41 @@ impl LogHistoryManager {
9295
Ok(Self {
9396
log_segment,
9497
snapshot,
98+
commit_to_timestamp_cache: Default::default(),
9599
})
96100
}
101+
fn update_cache_with_timestamp(&self, commit_file: &ParsedLogPath, value: Timestamp) {
102+
self.commit_to_timestamp_cache
103+
.borrow_mut()
104+
.insert(commit_file.location.location.clone(), value);
105+
}
106+
fn get_cached_timestamp(&self, commit_file: &ParsedLogPath) -> Option<Timestamp> {
107+
self.commit_to_timestamp_cache
108+
.borrow()
109+
.get(&commit_file.location.location)
110+
.copied()
111+
}
97112

98113
/// Gets the timestamp for the `commit_file`. If `read_ict` is false ,this returns the file's
99114
/// modification timestamp. If `read_ict` is true, this reads the file's In-commit timestamp.
100-
#[allow(unused)]
101115
fn commit_file_to_timestamp(
102116
&self,
103117
engine: &dyn Engine,
104118
commit_file: &ParsedLogPath,
105119
read_ict: bool,
106120
) -> Result<Timestamp, LogHistoryError> {
121+
if let Some(cached) = self.get_cached_timestamp(commit_file) {
122+
return Ok(cached);
123+
}
107124
let commit_timestamp = if read_ict {
108125
Self::read_in_commit_timestamp(engine, commit_file)?
109126
} else {
110127
// By default, the timestamp of a commit is its modification time
111128
commit_file.location.last_modified
112129
};
113130

131+
self.update_cache_with_timestamp(commit_file, commit_timestamp);
132+
114133
Ok(commit_timestamp)
115134
}
116135

@@ -122,7 +141,6 @@ impl LogHistoryManager {
122141
/// This returns a [`LogHistoryError::InCommitTimestampNotFoundError`] if the in-commit timestamp
123142
/// is not present in the commit file, or if the CommitInfo is not the first action in the
124143
/// commit.
125-
#[allow(unused)]
126144
fn read_in_commit_timestamp(
127145
engine: &dyn Engine,
128146
commit_file: &ParsedLogPath,
@@ -134,7 +152,7 @@ impl LogHistoryManager {
134152
};
135153

136154
// Get an iterator over the actions in the commit file
137-
let mut action_iter = engine
155+
let action_iter = engine
138156
.json_handler()
139157
.read_json_files(
140158
&[commit_file.location.clone()],
@@ -143,7 +161,7 @@ impl LogHistoryManager {
143161
)
144162
.map_err(wrap_err)?;
145163

146-
// Take the first non-empty engine data batch
164+
// Take the first engine data batch
147165
let batch = action_iter
148166
.map(|res| res.map_err(wrap_err))
149167
.next()
@@ -164,11 +182,170 @@ impl LogHistoryManager {
164182
})
165183
}
166184

185+
/// Gets the latest version that occurs before or at the given `timestamp`.
186+
///
187+
/// This finds the version whose timestamp is less than or equal to `timestamp`.
188+
/// If no such version exists, returns [`LogHistoryError::TimestampOutOfRange`].
189+
///
190+
////// # Examples
191+
/// ```rust
192+
/// # use delta_kernel::history_manager::error::LogHistoryError;
193+
/// # use delta_kernel::engine::sync::SyncEngine;
194+
/// # use delta_kernel::Table;
195+
/// # use std::sync::Arc;
196+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
197+
/// # let engine = Arc::new(SyncEngine::new());
198+
/// let table = Table::try_from_uri(path)?;
199+
/// let manager = table.history_manager(engine.as_ref())?;
200+
///
201+
/// // Get the latest version as of January 1, 2023
202+
/// let timestamp = 1672531200000; // Milliseconds since epoch for 2023-01-01
203+
/// let version_res = manager.latest_version_as_of(engine.as_ref(), timestamp);
204+
/// # Ok::<(), delta_kernel::Error>(())
205+
/// ```
206+
pub fn latest_version_as_of(
207+
&self,
208+
engine: &dyn Engine,
209+
timestamp: Timestamp,
210+
) -> DeltaResult<Version> {
211+
Ok(self.timestamp_to_version(engine, timestamp, Bound::GreatestLower)?)
212+
}
213+
214+
/// Gets the first version that occurs after the given `timestamp` (inclusive).
215+
///
216+
/// This finds the version whose timestamp is greater than or equal to `timestamp`.
217+
/// If no such version exists, returns [`LogHistoryError::TimestampOutOfRange`].
218+
/// # Examples
219+
/// ```rust
220+
/// # use delta_kernel::engine::sync::SyncEngine;
221+
/// # use delta_kernel::Table;
222+
/// # use std::sync::Arc;
223+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
224+
/// # let engine = Arc::new(SyncEngine::new());
225+
/// let table = Table::try_from_uri(path)?;
226+
/// let manager = table.history_manager(engine.as_ref())?;
227+
///
228+
/// // Find the first version that occurred after January 1, 2023
229+
/// let timestamp = 1672531200000; // Milliseconds since epoch for 2023-01-01
230+
/// let version_res = manager.first_version_after(engine.as_ref(), timestamp);
231+
/// # Ok::<(), delta_kernel::Error>(())
232+
/// ```
233+
pub fn first_version_after(
234+
&self,
235+
engine: &dyn Engine,
236+
timestamp: Timestamp,
237+
) -> DeltaResult<Version> {
238+
Ok(self.timestamp_to_version(engine, timestamp, Bound::LeastUpper)?)
239+
}
240+
241+
/// Converts a timestamp range to a corresponding version range.
242+
///
243+
/// This function finds the version range that corresponds to the given timestamp range.
244+
/// The returned tuple contains:
245+
/// - The first (earliest) version with a timestamp greater than or equal to `start_timestamp`
246+
/// - If `end_timestamp` is provided, the version with a timestamp less than or equal to `end_timestamp`.
247+
///
248+
/// # Arguments
249+
/// * `engine` - The engine used to access version history
250+
/// * `start_timestamp` - The lower bound timestamp (inclusive)
251+
/// * `end_timestamp` - The optional upper bound timestamp (inclusive), or `None` to indicate no upper bound
252+
///
253+
/// # Returns
254+
/// A tuple containing the start version and optional end version (inclusive)
255+
///
256+
/// # Errors
257+
/// Returns [`LogHistoryError::TimestampOutOfRange`] if:
258+
/// - No version exists at or after `start_timestamp`
259+
/// - `end_timestamp` is provided and no version exists at or before it
260+
///
261+
/// Returns [`LogHistoryError::InvalidTimestampRange`] if the entire range [start_timestamp,
262+
/// end_timestamp]
263+
///
264+
/// # Examples
265+
/// ```rust
266+
/// # use delta_kernel::engine::sync::SyncEngine;
267+
/// # use delta_kernel::Table;
268+
/// # use std::sync::Arc;
269+
/// # let path = "./tests/data/with_checkpoint_no_last_checkpoint";
270+
/// # let engine = Arc::new(SyncEngine::new());
271+
///
272+
/// let table = Table::try_from_uri(path)?;
273+
/// let manager = table.history_manager(engine.as_ref())?;
274+
///
275+
/// // Find versions between January 1, 2023 and March 1, 2023
276+
/// let start_timestamp = 1672531200000; // Jan 1, 2023 (milliseconds since epoch)
277+
/// let end_timestamp = 1677628800000; // Mar 1, 2023 (milliseconds since epoch)
278+
///
279+
/// let version_range_res =
280+
/// manager.timestamp_range_to_versions(engine.as_ref(), start_timestamp, end_timestamp);
281+
/// # Ok::<(), delta_kernel::Error>(())
282+
/// ```
283+
pub fn timestamp_range_to_versions(
284+
&self,
285+
engine: &dyn Engine,
286+
start_timestamp: Timestamp,
287+
end_timestamp: impl Into<Option<Timestamp>>,
288+
) -> DeltaResult<(Version, Option<Version>)> {
289+
// Check that the start and end timestamps are valid. Timestamps must be positive
290+
let end_timestamp = end_timestamp.into();
291+
require!(
292+
0 <= start_timestamp,
293+
LogHistoryError::InvalidTimestamp(start_timestamp).into()
294+
);
295+
if let Some(end_timestamp) = end_timestamp {
296+
require!(
297+
0 <= end_timestamp,
298+
LogHistoryError::InvalidTimestamp(end_timestamp).into()
299+
);
300+
// The `start_timestamp` must be no greater than the `end_timestamp`.
301+
require!(
302+
start_timestamp <= end_timestamp,
303+
LogHistoryError::InvalidTimestampRange {
304+
start_timestamp,
305+
end_timestamp
306+
}
307+
.into()
308+
);
309+
}
310+
311+
// Convert the start timestamp to version
312+
let start_version = self.first_version_after(engine, start_timestamp)?;
313+
314+
// If the end timestamp is present, convert it to an end version
315+
let end_version = end_timestamp
316+
.map(|end| {
317+
let end_version = self.latest_version_as_of(engine, end)?;
318+
319+
// Verify that the start version is no greater than the end version. This can
320+
// happen in the case that the entire timestamp range falls between two commits.
321+
// Consider the following history:
322+
// |-------------|--------------------|---------------|
323+
// v4 start_timestamp end_timestamp v5
324+
//
325+
// The latest version as of the end_timestamp is 4. The first version after the
326+
// start_timestamp is 5. Thus in the case where end_version < start_version, we
327+
// return and [`LogHistoryError::EmptyTimestampRange`].
328+
require!(
329+
start_version <= end_version,
330+
DeltaError::from(LogHistoryError::EmptyTimestampRange {
331+
end_timestamp: end,
332+
start_timestamp,
333+
between_left: end_version,
334+
between_right: start_version
335+
})
336+
);
337+
338+
Ok(end_version)
339+
})
340+
.transpose()?;
341+
342+
Ok((start_version, end_version))
343+
}
344+
167345
/// Given a timestamp, this function determines the commit range that timestamp conversion
168346
/// should search. A timestamp search may be conducted over one of two version ranges:
169347
/// 1) A range of commits whose timestamp is the file modification timestamp
170348
/// 2) A range of commits whose timestamp is an in-commit timestamp.
171-
#[allow(unused)]
172349
fn get_timestamp_search_bounds(
173350
&self,
174351
timestamp: Timestamp,
@@ -236,7 +413,6 @@ impl LogHistoryManager {
236413
/// happen in the following cases based on the bound:
237414
/// - `Bound::GreatestLower`: There is no commit whose timestamp is lower than the given `timestamp`.
238415
/// - `Bound::LeastUpper`: There is no commit whose timestamp is greater than the given `timestamp`.
239-
#[allow(unused)]
240416
fn timestamp_to_version(
241417
&self,
242418
engine: &dyn Engine,

kernel/src/history_manager/search.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@ use delta_kernel_derive::internal_api;
1515
/// * [`Bound::GreatestLower`] - Finds the largest index `i` such that `values[i] <= key`.
1616
/// This represents the last element less than or equal to the search key.
1717
#[internal_api]
18-
#[allow(unused)]
1918
#[derive(Debug, Clone, Copy, PartialEq)]
2019
pub(crate) enum Bound {
2120
LeastUpper,
2221
GreatestLower,
2322
}
2423

2524
#[internal_api]
26-
#[allow(unused)]
2725
#[derive(Debug)]
2826
pub(crate) enum SearchError<T: Error> {
2927
OutOfRange,
@@ -119,7 +117,6 @@ pub(crate) enum SearchError<T: Error> {
119117
/// assert!(matches!(result, Err(SearchError::KeyFunctionError(_))));
120118
/// ```
121119
#[internal_api]
122-
#[allow(unused)]
123120
pub(crate) fn binary_search_by_key_with_bounds<'a, T, K: Ord + Debug, E: Error>(
124121
values: &'a [T],
125122
key: K,

kernel/src/history_manager/timestamp_visitor.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@ use crate::{DeltaResult, Error, RowVisitor};
1414
///
1515
/// Only one row of the engine data is checked because CommitInfo. This is because in-commit
1616
/// timestamps requires that the CommitInfo containing the ICT be the first action in the log.
17-
#[allow(unused)]
1817
#[derive(Default)]
1918
pub(crate) struct InCommitTimestampVisitor {
2019
pub(crate) in_commit_timestamp: Option<i64>,
2120
}
2221

2322
impl InCommitTimestampVisitor {
24-
#[allow(unused)]
2523
/// Get the schema that the visitor expects the data to have.
2624
pub(crate) fn schema() -> Arc<Schema> {
2725
static SCHEMA: LazyLock<Arc<Schema>> = LazyLock::new(|| {

kernel/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ pub mod checkpoint;
7878
pub mod engine_data;
7979
pub mod error;
8080
pub mod expressions;
81+
pub mod history_manager;
8182
pub mod scan;
8283
pub mod schema;
8384
pub mod snapshot;
@@ -95,7 +96,6 @@ pub use arrow_compat::*;
9596
pub(crate) mod kernel_predicates;
9697
pub(crate) mod utils;
9798

98-
internal_mod!(pub(crate) mod history_manager);
9999
internal_mod!(pub(crate) mod path);
100100
internal_mod!(pub(crate) mod log_replay);
101101
internal_mod!(pub(crate) mod log_segment);

kernel/src/log_segment.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,6 @@ impl LogSegment {
191191
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
192192
}
193193

194-
#[allow(unused)]
195194
/// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will consist
196195
/// only of contiguous commit files. If an `end_version` is specified, the commit range will
197196
/// include commits up to the `end_version` commit (inclusive). If present, `limit` specifies the

0 commit comments

Comments
 (0)