Skip to content

Commit 5480711

Browse files
incremental snapshot update, log segment, table config
1 parent 3d37288 commit 5480711

File tree

4 files changed

+168
-15
lines changed

4 files changed

+168
-15
lines changed

kernel/src/log_segment.rs

+49-3
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,36 @@ impl LogSegment {
146146
)
147147
}
148148

149+
pub(crate) fn for_versions(
150+
fs_client: &dyn FileSystemClient,
151+
log_root: Url,
152+
start_version: Version,
153+
end_version: impl Into<Option<Version>>,
154+
) -> DeltaResult<Self> {
155+
let end_version = end_version.into();
156+
if let Some(end_version) = end_version {
157+
if start_version > end_version {
158+
return Err(Error::generic(
159+
"Failed to build LogSegment: start_version cannot be greater than end_version",
160+
));
161+
}
162+
}
163+
let (mut ascending_commit_files, checkpoint_parts) =
164+
list_log_files_with_version(fs_client, &log_root, Some(start_version), end_version)?;
165+
166+
// Commit file versions must be greater than the most recent checkpoint version if it exists
167+
if let Some(checkpoint_file) = checkpoint_parts.first() {
168+
ascending_commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
169+
}
170+
171+
LogSegment::try_new(
172+
ascending_commit_files,
173+
checkpoint_parts,
174+
log_root,
175+
end_version,
176+
)
177+
}
178+
149179
/// Constructs a [`LogSegment`] to be used for `TableChanges`. For a TableChanges between versions
150180
/// `start_version` and `end_version`: Its LogSegment is made of zero checkpoints and all commits
151181
/// between versions `start_version` (inclusive) and `end_version` (inclusive). If no `end_version`
@@ -186,6 +216,7 @@ impl LogSegment {
186216
);
187217
LogSegment::try_new(ascending_commit_files, vec![], log_root, end_version)
188218
}
219+
189220
/// Read a stream of log data from this log segment.
190221
///
191222
/// The log files will be read from most recent to oldest.
@@ -360,8 +391,12 @@ impl LogSegment {
360391
)?))
361392
}
362393

363-
// Get the most up-to-date Protocol and Metadata actions
364-
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
394+
// Do a lightweight protocol+metadata log replay to find the latest Protocol and Metadata in
395+
// the LogSegment
396+
pub(crate) fn protocol_and_metadata(
397+
&self,
398+
engine: &dyn Engine,
399+
) -> DeltaResult<(Option<Metadata>, Option<Protocol>)> {
365400
let data_batches = self.replay_for_metadata(engine)?;
366401
let (mut metadata_opt, mut protocol_opt) = (None, None);
367402
for batch in data_batches {
@@ -377,7 +412,12 @@ impl LogSegment {
377412
break;
378413
}
379414
}
380-
match (metadata_opt, protocol_opt) {
415+
Ok((metadata_opt, protocol_opt))
416+
}
417+
418+
// Get the most up-to-date Protocol and Metadata actions
419+
pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> {
420+
match self.protocol_and_metadata(engine)? {
381421
(Some(m), Some(p)) => Ok((m, p)),
382422
(None, Some(_)) => Err(Error::MissingMetadata),
383423
(Some(_), None) => Err(Error::MissingProtocol),
@@ -401,6 +441,11 @@ impl LogSegment {
401441
// read the same protocol and metadata schema for both commits and checkpoints
402442
self.replay(engine, schema.clone(), schema, META_PREDICATE.clone())
403443
}
444+
445+
/// Return whether or not the LogSegment contains a checkpoint.
446+
pub(crate) fn has_checkpoint(&self) -> bool {
447+
!self.checkpoint_parts.is_empty()
448+
}
404449
}
405450

406451
/// Returns a fallible iterator of [`ParsedLogPath`] that are between the provided `start_version` (inclusive)
@@ -430,6 +475,7 @@ fn list_log_files(
430475
Err(_) => true,
431476
}))
432477
}
478+
433479
/// List all commit and checkpoint files with versions above the provided `start_version` (inclusive).
434480
/// If successful, this returns a tuple `(ascending_commit_files, checkpoint_parts)` of type
435481
/// `(Vec<ParsedLogPath>, Vec<ParsedLogPath>)`. The commit files are guaranteed to be sorted in

kernel/src/snapshot.rs

+59-7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ impl std::fmt::Debug for Snapshot {
4343
}
4444

4545
impl Snapshot {
46+
fn new(log_segment: LogSegment, table_configuration: TableConfiguration) -> Self {
47+
Self {
48+
log_segment,
49+
table_configuration,
50+
}
51+
}
52+
4653
/// Create a new [`Snapshot`] instance for the given version.
4754
///
4855
/// # Parameters
@@ -79,13 +86,57 @@ impl Snapshot {
7986
/// - `version`: target version of the [`Snapshot`]. None will create a snapshot at the latest
8087
/// version of the table.
8188
pub fn new_from(
82-
existing_snapshot: &Snapshot,
89+
existing_snapshot: Arc<Snapshot>,
8390
engine: &dyn Engine,
8491
version: Option<Version>,
85-
) -> DeltaResult<Self> {
86-
// TODO(zach): for now we just pass through to the old API. We should instead optimize this
87-
// to avoid replaying overlapping LogSegments.
88-
Self::try_new(existing_snapshot.table_root.clone(), engine, version)
92+
) -> DeltaResult<Arc<Self>> {
93+
// simple heuristic for now:
94+
// 1. if the new version < existing version, just return an entirely new snapshot
95+
// 2. if the new version == existing version, just return the existing snapshot
96+
// 3. list from existing snapshot version
97+
// 4a. if new checkpoint is found: just create a new snapshot from that checkpoint (and
98+
// commits after it)
99+
// 4b. if no new checkpoint is found: do lightweight P+M replay on the latest commits
100+
match version {
101+
Some(v) if v < existing_snapshot.version() => {
102+
Self::try_new(existing_snapshot.table_root().clone(), engine, version).map(Arc::new)
103+
}
104+
Some(v) if v == existing_snapshot.version() => Ok(existing_snapshot.clone()),
105+
new_version => {
106+
debug!(
107+
"new version: {new_version:?}, existing version: {}",
108+
existing_snapshot.version()
109+
);
110+
let log_root = existing_snapshot.log_segment.log_root.clone();
111+
let fs_client = engine.get_file_system_client();
112+
113+
// create a log segment just from existing_snapshot.version -> new_version
114+
let log_segment = LogSegment::for_versions(
115+
fs_client.as_ref(),
116+
log_root,
117+
existing_snapshot.version(),
118+
new_version,
119+
)?;
120+
121+
if log_segment.has_checkpoint() {
122+
Self::try_new_from_log_segment(
123+
existing_snapshot.table_root().clone(),
124+
log_segment,
125+
engine,
126+
)
127+
.map(Arc::new)
128+
} else {
129+
let (new_metadata, new_protocol) = log_segment.protocol_and_metadata(engine)?;
130+
let table_configuration = TableConfiguration::new_from(
131+
existing_snapshot.table_configuration(),
132+
new_metadata,
133+
new_protocol,
134+
log_segment.end_version,
135+
)?;
136+
Ok(Arc::new(Snapshot::new(log_segment, table_configuration)))
137+
}
138+
}
139+
}
89140
}
90141

91142
/// Create a new [`Snapshot`] instance.
@@ -262,15 +313,16 @@ mod tests {
262313
assert_eq!(snapshot.schema(), &expected);
263314
}
264315

316+
// TODO(zach)
265317
#[test]
266318
fn test_snapshot_new_from() {
267319
let path =
268320
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
269321
let url = url::Url::from_directory_path(path).unwrap();
270322

271323
let engine = SyncEngine::new();
272-
let old_snapshot = Snapshot::try_new(url, &engine, Some(0)).unwrap();
273-
let snapshot = Snapshot::new_from(&old_snapshot, &engine, Some(0)).unwrap();
324+
let old_snapshot = Arc::new(Snapshot::try_new(url, &engine, Some(0)).unwrap());
325+
let snapshot = Snapshot::new_from(old_snapshot, &engine, Some(0)).unwrap();
274326

275327
let expected =
276328
Protocol::try_new(3, 7, Some(["deletionVectors"]), Some(["deletionVectors"])).unwrap();

kernel/src/table_changes/mod.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
111111
pub struct TableChanges {
112112
pub(crate) log_segment: LogSegment,
113113
table_root: Url,
114-
end_snapshot: Snapshot,
114+
end_snapshot: Arc<Snapshot>,
115115
start_version: Version,
116116
schema: Schema,
117117
}
@@ -149,9 +149,12 @@ impl TableChanges {
149149
// Both snapshots ensure that reading is supported at the start and end version using
150150
// `ensure_read_supported`. Note that we must still verify that reading is
151151
// supported for every protocol action in the CDF range.
152-
let start_snapshot =
153-
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
154-
let end_snapshot = Snapshot::new_from(&start_snapshot, engine, end_version)?;
152+
let start_snapshot = Arc::new(Snapshot::try_new(
153+
table_root.as_url().clone(),
154+
engine,
155+
Some(start_version),
156+
)?);
157+
let end_snapshot = Snapshot::new_from(start_snapshot.clone(), engine, end_version)?;
155158

156159
// Verify CDF is enabled at the beginning and end of the interval using
157160
// [`check_cdf_table_properties`] to fail early. This also ensures that column mapping is

kernel/src/table_configuration.rs

+53-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use crate::{DeltaResult, Error, Version};
3333
/// `try_new` successfully returns `TableConfiguration`, it is also guaranteed that reading the
3434
/// table is supported.
3535
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
36-
#[derive(Debug)]
36+
#[derive(Debug, Clone)]
3737
pub(crate) struct TableConfiguration {
3838
metadata: Metadata,
3939
protocol: Protocol,
@@ -89,6 +89,58 @@ impl TableConfiguration {
8989
})
9090
}
9191

92+
pub(crate) fn new_from(
93+
table_configuration: &Self,
94+
new_metadata: Option<Metadata>,
95+
new_protocol: Option<Protocol>,
96+
new_version: Version,
97+
) -> DeltaResult<Self> {
98+
// simplest case: no new P/M, just return the existing table configuration with new version
99+
if new_metadata.is_none() && new_protocol.is_none() {
100+
return Ok(Self {
101+
version: new_version,
102+
..(*table_configuration).clone()
103+
});
104+
}
105+
106+
// if there's new metadata: have to parse schema, table properties
107+
let (metadata, schema, table_properties) = match new_metadata {
108+
Some(metadata) => {
109+
let schema = Arc::new(metadata.parse_schema()?);
110+
let table_properties = metadata.parse_table_properties();
111+
(metadata, schema, table_properties)
112+
}
113+
None => (
114+
table_configuration.metadata.clone(),
115+
table_configuration.schema.clone(),
116+
table_configuration.table_properties.clone(),
117+
),
118+
};
119+
120+
// if there's new protocol: have to ensure read suported
121+
let protocol = match new_protocol {
122+
Some(protocol) => {
123+
protocol.ensure_read_supported()?;
124+
protocol
125+
}
126+
None => table_configuration.protocol.clone(),
127+
};
128+
129+
// if either change, have to validate column mapping mode
130+
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
131+
validate_schema_column_mapping(&schema, column_mapping_mode)?;
132+
133+
Ok(Self {
134+
schema,
135+
metadata,
136+
protocol,
137+
table_properties,
138+
column_mapping_mode,
139+
table_root: table_configuration.table_root.clone(),
140+
version: new_version,
141+
})
142+
}
143+
92144
/// The [`Metadata`] for this table at this version.
93145
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
94146
pub(crate) fn metadata(&self) -> &Metadata {

0 commit comments

Comments
 (0)