Skip to content

Commit e92ca69

Browse files
committed
has_compat
1 parent c8c1f6b commit e92ca69

File tree

10 files changed

+239
-76
lines changed

10 files changed

+239
-76
lines changed

kernel/src/log_segment.rs

Lines changed: 129 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ use url::Url;
3636
#[cfg(test)]
3737
mod tests;
3838

39+
/// Information about checkpoint reading for data skipping optimization.
40+
///
41+
/// Returned alongside the actions iterator from checkpoint reading functions.
42+
#[derive(Debug, Clone)]
43+
pub(crate) struct CheckpointReadInfo {
44+
/// Whether the checkpoint has compatible pre-parsed stats for data skipping.
45+
/// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON.
46+
#[allow(unused)]
47+
pub has_stats_parsed: bool,
48+
/// The schema used to read checkpoint files, potentially including stats_parsed.
49+
#[allow(unused)]
50+
pub checkpoint_read_schema: SchemaRef,
51+
}
52+
3953
/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
4054
/// and commit files and guarantees the following:
4155
/// 1. Commit file versions will not have any gaps between them.
@@ -384,21 +398,34 @@ impl LogSegment {
384398
///
385399
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
386400
/// query's predicate, but rather a predicate for filtering log files themselves.
401+
/// Read a stream of actions from this log segment. This returns an iterator of
402+
/// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
403+
/// the data was read from a commit file (true) or a checkpoint file (false).
404+
///
405+
/// Also returns [`CheckpointReadInfo`] with stats_parsed compatibility and the checkpoint schema.
387406
#[internal_api]
388407
pub(crate) fn read_actions_with_projected_checkpoint_actions(
389408
&self,
390409
engine: &dyn Engine,
391410
commit_read_schema: SchemaRef,
392411
checkpoint_read_schema: SchemaRef,
393412
meta_predicate: Option<PredicateRef>,
394-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
413+
stats_schema: Option<&StructType>,
414+
) -> DeltaResult<(
415+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
416+
CheckpointReadInfo,
417+
)> {
395418
// `replay` expects commit files to be sorted in descending order, so the return value here is correct
396419
let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;
397420

398-
let checkpoint_stream =
399-
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;
421+
let (checkpoint_stream, checkpoint_info) = self.create_checkpoint_stream(
422+
engine,
423+
checkpoint_read_schema,
424+
meta_predicate,
425+
stats_schema,
426+
)?;
400427

401-
Ok(commit_stream.chain(checkpoint_stream))
428+
Ok((commit_stream.chain(checkpoint_stream), checkpoint_info))
402429
}
403430

404431
// Same as above, but uses the same schema for reading checkpoints and commits.
@@ -409,12 +436,15 @@ impl LogSegment {
409436
action_schema: SchemaRef,
410437
meta_predicate: Option<PredicateRef>,
411438
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
412-
self.read_actions_with_projected_checkpoint_actions(
413-
engine,
414-
action_schema.clone(),
415-
action_schema,
416-
meta_predicate,
417-
)
439+
let (actions_iter, _checkpoint_info) = self
440+
.read_actions_with_projected_checkpoint_actions(
441+
engine,
442+
action_schema.clone(),
443+
action_schema,
444+
meta_predicate,
445+
None,
446+
)?;
447+
Ok(actions_iter)
418448
}
419449

420450
/// find a minimal set to cover the range of commits we want. This is greedy so not always
@@ -556,12 +586,18 @@ impl LogSegment {
556586
/// 1. Determines the files actions schema (for future stats_parsed detection)
557587
/// 2. Extracts sidecar file references if present (V2 checkpoints)
558588
/// 3. Reads checkpoint and sidecar data using cached sidecar refs
589+
///
590+
/// Returns a tuple of the actions iterator and [`CheckpointReadInfo`].
559591
fn create_checkpoint_stream(
560592
&self,
561593
engine: &dyn Engine,
562594
action_schema: SchemaRef,
563595
meta_predicate: Option<PredicateRef>,
564-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
596+
stats_schema: Option<&StructType>,
597+
) -> DeltaResult<(
598+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
599+
CheckpointReadInfo,
600+
)> {
565601
let need_file_actions = schema_contains_file_actions(&action_schema);
566602

567603
// Extract file actions schema and sidecar files
@@ -574,22 +610,61 @@ impl LogSegment {
574610
(None, vec![])
575611
};
576612

577-
// (Future) Determine if there are usable parsed stats
578-
// let _has_stats_parsed = file_actions_schema.as_ref()
579-
// .map(|s| Self::schema_has_compatible_stats_parsed(s, stats_schema))
580-
// .unwrap_or(false);
581-
let _ = file_actions_schema; // Suppress unused warning for now
582-
583-
// Read the actual checkpoint files, using cached sidecar files
584-
// We expand sidecars if we have them and need file actions
585-
let checkpoint_read_schema = if need_file_actions
586-
&& !sidecar_files.is_empty()
587-
&& !action_schema.contains(SIDECAR_NAME)
588-
{
589-
Arc::new(
590-
action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?,
613+
// Check if checkpoint has compatible stats_parsed and add it to the schema if so
614+
let has_stats_parsed =
615+
stats_schema
616+
.zip(file_actions_schema.as_ref())
617+
.is_some_and(|(stats, file_schema)| {
618+
Self::schema_has_compatible_stats_parsed(file_schema, stats)
619+
});
620+
621+
// Build final schema with any additional fields needed (stats_parsed, sidecar)
622+
// Only modify the schema if it has an "add" field (i.e., we need file actions)
623+
let final_schema = if let Some(add_field) = action_schema.field("add") {
624+
let DataType::Struct(add_struct) = add_field.data_type() else {
625+
return Err(Error::internal_error(
626+
"add field in action schema must be a struct",
627+
));
628+
};
629+
let mut add_fields: Vec<StructField> = add_struct.fields().cloned().collect();
630+
631+
// Add stats_parsed if checkpoint has compatible parsed stats
632+
if let (true, Some(stats_schema)) = (has_stats_parsed, stats_schema) {
633+
add_fields.push(StructField::nullable(
634+
"stats_parsed",
635+
DataType::Struct(Box::new(stats_schema.clone())),
636+
));
637+
}
638+
639+
// Rebuild the add field with any new fields (stats_parsed)
640+
let new_add_field = StructField::new(
641+
add_field.name(),
642+
StructType::new_unchecked(add_fields),
643+
add_field.is_nullable(),
591644
)
645+
.with_metadata(add_field.metadata.clone());
646+
647+
// Rebuild schema with modified add field
648+
let mut new_fields: Vec<StructField> = action_schema
649+
.fields()
650+
.map(|f| {
651+
if f.name() == "add" {
652+
new_add_field.clone()
653+
} else {
654+
f.clone()
655+
}
656+
})
657+
.collect();
658+
659+
// Add sidecar column at top-level for V2 checkpoints
660+
if need_file_actions && !sidecar_files.is_empty() {
661+
new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()));
662+
}
663+
664+
Arc::new(StructType::new_unchecked(new_fields))
592665
} else {
666+
// Schema doesn't have "add" field (e.g., for metadata/protocol only reads)
667+
// Use the action_schema as-is
593668
action_schema.clone()
594669
};
595670

@@ -609,14 +684,14 @@ impl LogSegment {
609684
Some(parsed_log_path) if parsed_log_path.extension == "json" => {
610685
engine.json_handler().read_json_files(
611686
&checkpoint_file_meta,
612-
checkpoint_read_schema.clone(),
687+
final_schema.clone(),
613688
meta_predicate.clone(),
614689
)?
615690
}
616691
Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler
617692
.read_parquet_files(
618693
&checkpoint_file_meta,
619-
checkpoint_read_schema.clone(),
694+
final_schema.clone(),
620695
meta_predicate.clone(),
621696
)?,
622697
Some(parsed_log_path) => {
@@ -644,7 +719,11 @@ impl LogSegment {
644719
.map_ok(|batch| ActionsBatch::new(batch, false))
645720
.chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false)));
646721

647-
Ok(actions_iter)
722+
let checkpoint_info = CheckpointReadInfo {
723+
has_stats_parsed,
724+
checkpoint_read_schema: final_schema,
725+
};
726+
Ok((actions_iter, checkpoint_info))
648727
}
649728

650729
/// Extracts sidecar file references from a checkpoint file.
@@ -804,8 +883,7 @@ impl LogSegment {
804883
/// use physical column names (not logical names), so direct name comparison is correct.
805884
///
806885
/// Returns `false` if stats_parsed doesn't exist or has incompatible types.
807-
#[allow(dead_code)]
808-
fn schema_has_compatible_stats_parsed(
886+
pub(crate) fn schema_has_compatible_stats_parsed(
809887
checkpoint_schema: &StructType,
810888
stats_schema: &StructType,
811889
) -> bool {
@@ -833,42 +911,53 @@ impl LogSegment {
833911
// While these typically have the same schema, the protocol doesn't guarantee it,
834912
// so we check both to be safe.
835913
for field_name in ["minValues", "maxValues"] {
836-
let Some(values_field) = stats_struct.field(field_name) else {
914+
let Some(checkpoint_values_field) = stats_struct.field(field_name) else {
837915
// stats_parsed exists but no minValues/maxValues - unusual but valid
838916
continue;
839917
};
840918

841919
// minValues/maxValues must be a Struct containing per-column statistics.
842920
// If it exists but isn't a Struct, the schema is malformed and unusable.
843-
let DataType::Struct(values_struct) = values_field.data_type() else {
921+
let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else {
844922
debug!(
845923
"stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}",
846924
field_name,
847-
values_field.data_type()
925+
checkpoint_values_field.data_type()
848926
);
849927
return false;
850928
};
851929

852-
// Check type compatibility for each column in the checkpoint's stats_parsed
853-
// that also exists in the stats schema (columns needed for data skipping)
854-
for checkpoint_field in values_struct.fields() {
855-
if let Some(stats_field) = stats_schema.field(&checkpoint_field.name) {
930+
// Get the corresponding field from stats_schema (e.g., stats_schema.minValues)
931+
let Some(stats_values_field) = stats_schema.field(field_name) else {
932+
// stats_schema doesn't have minValues/maxValues, skip this check
933+
continue;
934+
};
935+
let DataType::Struct(stats_values) = stats_values_field.data_type() else {
936+
// stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip
937+
continue;
938+
};
939+
940+
// Check type compatibility for each column needed for data skipping
941+
// If it exists in checkpoint, verify types are compatible
942+
for stats_field in stats_values.fields() {
943+
if let Some(checkpoint_field) = checkpoint_values.field(&stats_field.name) {
856944
if checkpoint_field
857945
.data_type()
858946
.can_read_as(stats_field.data_type())
859947
.is_err()
860948
{
861949
debug!(
862-
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema has {:?}",
863-
checkpoint_field.name,
950+
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema needs {:?}",
951+
stats_field.name,
864952
field_name,
865953
checkpoint_field.data_type(),
866954
stats_field.data_type()
867955
);
868956
return false;
869957
}
870958
}
871-
// If column doesn't exist in stats schema, it's fine (not needed for data skipping)
959+
// If the column is missing from checkpoint's stats_parsed, it will return
960+
// null when accessed, which is acceptable for data skipping.
872961
}
873962
}
874963

0 commit comments

Comments
 (0)