Skip to content

Commit 52fab21

Browse files
History manager
1 parent 0cdf4e7 commit 52fab21

File tree

2 files changed

+216
-1
lines changed

2 files changed

+216
-1
lines changed

kernel/src/log_segment.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit
22
//! files.
3-
use std::collections::HashMap;
3+
use std::collections::{HashMap, VecDeque};
44
use std::convert::identity;
55
use std::sync::{Arc, LazyLock};
66

@@ -191,6 +191,87 @@ impl LogSegment {
191191
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
192192
}
193193

194+
#[allow(unused)]
195+
/// Constructs a [`LogSegment`] to be used for timestamp conversion. This [`LogSegment`] will consist
196+
/// only of contiguous commit files. If an `end_version` is specified, the commit range will
197+
/// include commits up to the `end_version` commit (inclusive). If present, `limit` specifies the
198+
/// maximum length of the returned LogSegment.
199+
pub(crate) fn for_timestamp_conversion(
200+
storage: &dyn StorageHandler,
201+
log_root: Url,
202+
end_version: Version,
203+
limit: Option<usize>,
204+
) -> DeltaResult<Self> {
205+
// Compute the version to start listing from.
206+
let start_from: Option<u64> = match limit {
207+
Some(limit) => {
208+
let limit: u64 = limit.try_into().map_err(|x| {
209+
Error::generic(
210+
"Failed to convert limit into u64
211+
when building log segment in timestamp conversion",
212+
)
213+
})?;
214+
Some(end_version - limit)
215+
}
216+
None => None,
217+
};
218+
219+
// List the commits greater than or equal to `start_from`. For large tables, listing with
220+
// `start_from` can be a significant speedup over listing _all_ the files in the log.
221+
let ascending_commit_files =
222+
list_log_files(storage, &log_root, start_from, Some(end_version))?
223+
.filter_ok(ParsedLogPath::is_commit);
224+
225+
// If limit is not specified, then use 10 as a default. Typically, there are 10 commits per
226+
// checkpoint, so start with that.
227+
let mut contiguous_commits: VecDeque<ParsedLogPath> =
228+
VecDeque::with_capacity(limit.unwrap_or(10));
229+
230+
for commit_res in ascending_commit_files {
231+
let commit = match commit_res {
232+
Ok(commit) => commit,
233+
Err(Error::InvalidLogPath(_)) => continue, // Invalid log paths are ignored
234+
Err(err) => return Err(err), // All other errors are propagated to the caller
235+
};
236+
237+
if contiguous_commits
238+
.back()
239+
.is_some_and(|prev_commit| prev_commit.version != commit.version - 1)
240+
{
241+
// We found a gap, so throw away all earlier versions
242+
contiguous_commits.clear();
243+
}
244+
245+
contiguous_commits.push_back(commit);
246+
247+
// If the number of commits exceeds the limit, remove the earliest one.
248+
if limit.is_some_and(|limit| contiguous_commits.len() > limit) {
249+
contiguous_commits.pop_front();
250+
}
251+
}
252+
253+
// If we have a non-empty commit list and a requested end version, verify they match.
254+
//
255+
// NOTE: No need to check for an empty commit list, `LogSegment::try_new` fails in that case.
256+
match contiguous_commits.back() {
257+
Some(last_commit) if last_commit.version != end_version => {
258+
return Err(Error::generic(format!(
259+
"Failed to build LogSegment for timestamp conversion.
260+
Expected end version {end_version}, but found {}",
261+
last_commit.version
262+
)));
263+
}
264+
_ => (),
265+
}
266+
267+
LogSegment::try_new(
268+
contiguous_commits.into(),
269+
vec![],
270+
log_root,
271+
Some(end_version),
272+
)
273+
}
274+
194275
/// Read a stream of actions from this log segment. This returns an iterator of (EngineData,
195276
/// bool) pairs, where the boolean flag indicates whether the data was read from a commit file
196277
/// (true) or a checkpoint file (false).

kernel/src/log_segment/tests.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,3 +1255,137 @@ fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batch
12551255

12561256
Ok(())
12571257
}
1258+
1259+
#[test]
1260+
fn for_timestamp_conversion_gets_commit_range() {
1261+
let (storage, log_root) = build_log_with_paths_and_checkpoint(
1262+
&[
1263+
delta_path_for_version(0, "json"),
1264+
delta_path_for_version(1, "json"),
1265+
delta_path_for_version(1, "checkpoint.parquet"),
1266+
delta_path_for_version(2, "json"),
1267+
delta_path_for_version(3, "json"),
1268+
delta_path_for_version(3, "checkpoint.parquet"),
1269+
delta_path_for_version(4, "json"),
1270+
delta_path_for_version(5, "json"),
1271+
delta_path_for_version(5, "checkpoint.parquet"),
1272+
delta_path_for_version(6, "json"),
1273+
delta_path_for_version(7, "json"),
1274+
],
1275+
None,
1276+
);
1277+
1278+
let log_segment =
1279+
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 7, None).unwrap();
1280+
let commit_files = log_segment.ascending_commit_files;
1281+
let checkpoint_parts = log_segment.checkpoint_parts;
1282+
1283+
assert!(checkpoint_parts.is_empty());
1284+
1285+
let versions = commit_files.iter().map(|x| x.version).collect_vec();
1286+
assert_eq!(vec![0, 1, 2, 3, 4, 5, 6, 7], versions);
1287+
}
1288+
1289+
#[test]
1290+
fn for_timestamp_conversion_with_old_end_version() {
1291+
let (storage, log_root) = build_log_with_paths_and_checkpoint(
1292+
&[
1293+
delta_path_for_version(0, "json"),
1294+
delta_path_for_version(1, "json"),
1295+
delta_path_for_version(1, "checkpoint.parquet"),
1296+
delta_path_for_version(2, "json"),
1297+
delta_path_for_version(3, "json"),
1298+
delta_path_for_version(3, "checkpoint.parquet"),
1299+
delta_path_for_version(4, "json"),
1300+
delta_path_for_version(5, "json"),
1301+
delta_path_for_version(5, "checkpoint.parquet"),
1302+
delta_path_for_version(6, "json"),
1303+
delta_path_for_version(7, "json"),
1304+
],
1305+
None,
1306+
);
1307+
1308+
let log_segment =
1309+
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 5, None).unwrap();
1310+
let commit_files = log_segment.ascending_commit_files;
1311+
let checkpoint_parts = log_segment.checkpoint_parts;
1312+
1313+
assert!(checkpoint_parts.is_empty());
1314+
1315+
let versions = commit_files.iter().map(|x| x.version).collect_vec();
1316+
assert_eq!(vec![0, 1, 2, 3, 4, 5], versions);
1317+
}
1318+
1319+
#[test]
1320+
fn for_timestamp_conversion_only_contiguous_ranges() {
1321+
let (storage, log_root) = build_log_with_paths_and_checkpoint(
1322+
&[
1323+
delta_path_for_version(0, "json"),
1324+
delta_path_for_version(1, "json"),
1325+
delta_path_for_version(1, "checkpoint.parquet"),
1326+
delta_path_for_version(2, "json"),
1327+
delta_path_for_version(3, "json"),
1328+
delta_path_for_version(3, "checkpoint.parquet"),
1329+
// version 4 is missing
1330+
delta_path_for_version(5, "json"),
1331+
delta_path_for_version(5, "checkpoint.parquet"),
1332+
delta_path_for_version(6, "json"),
1333+
delta_path_for_version(7, "json"),
1334+
],
1335+
None,
1336+
);
1337+
1338+
let log_segment =
1339+
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 7, None).unwrap();
1340+
let commit_files = log_segment.ascending_commit_files;
1341+
let checkpoint_parts = log_segment.checkpoint_parts;
1342+
1343+
assert!(checkpoint_parts.is_empty());
1344+
1345+
let versions = commit_files.iter().map(|x| x.version).collect_vec();
1346+
assert_eq!(vec![5, 6, 7], versions);
1347+
}
1348+
1349+
#[test]
1350+
fn for_timestamp_conversion_with_limit() {
1351+
let (storage, log_root) = build_log_with_paths_and_checkpoint(
1352+
&[
1353+
delta_path_for_version(0, "json"),
1354+
delta_path_for_version(1, "json"),
1355+
delta_path_for_version(1, "checkpoint.parquet"),
1356+
delta_path_for_version(2, "json"),
1357+
delta_path_for_version(3, "json"),
1358+
delta_path_for_version(3, "checkpoint.parquet"),
1359+
delta_path_for_version(4, "json"),
1360+
delta_path_for_version(5, "json"),
1361+
delta_path_for_version(5, "checkpoint.parquet"),
1362+
delta_path_for_version(6, "json"),
1363+
delta_path_for_version(7, "json"),
1364+
],
1365+
None,
1366+
);
1367+
1368+
let log_segment =
1369+
LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 7, Some(3))
1370+
.unwrap();
1371+
let commit_files = log_segment.ascending_commit_files;
1372+
let checkpoint_parts = log_segment.checkpoint_parts;
1373+
1374+
assert!(checkpoint_parts.is_empty());
1375+
1376+
let versions = commit_files.iter().map(|x| x.version).collect_vec();
1377+
assert_eq!(vec![5, 6, 7], versions);
1378+
}
1379+
1380+
#[test]
1381+
fn for_timestamp_conversion_no_commit_files() {
1382+
let (storage, log_root) = build_log_with_paths_and_checkpoint(
1383+
&[delta_path_for_version(5, "checkpoint.parquet")],
1384+
None,
1385+
);
1386+
1387+
let res = LogSegment::for_timestamp_conversion(storage.as_ref(), log_root.clone(), 0, None);
1388+
assert!(res.is_err());
1389+
let msg = res.err().unwrap().to_string();
1390+
assert!(msg.contains("No files in log segment"))
1391+
}

0 commit comments

Comments
 (0)