Skip to content

Commit a9b9b11

Browse files
committed
has_compat
1 parent 16e428c commit a9b9b11

File tree

9 files changed

+244
-78
lines changed

9 files changed

+244
-78
lines changed

kernel/src/log_segment.rs

Lines changed: 135 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ use url::Url;
3636
#[cfg(test)]
3737
mod tests;
3838

39+
/// Information about checkpoint reading for data skipping optimization.
40+
///
41+
/// Returned alongside the actions iterator from checkpoint reading functions.
42+
#[derive(Debug, Clone)]
43+
pub(crate) struct CheckpointReadInfo {
44+
/// Whether the checkpoint has compatible pre-parsed stats for data skipping.
45+
/// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON.
46+
#[allow(unused)]
47+
pub has_stats_parsed: bool,
48+
/// The schema used to read checkpoint files, potentially including stats_parsed.
49+
#[allow(unused)]
50+
pub checkpoint_read_schema: SchemaRef,
51+
}
52+
3953
/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
4054
/// and commit files and guarantees the following:
4155
/// 1. Commit file versions will not have any gaps between them.
@@ -384,21 +398,34 @@ impl LogSegment {
384398
///
385399
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
386400
/// query's predicate, but rather a predicate for filtering log files themselves.
401+
/// Read a stream of actions from this log segment. This returns an iterator of
402+
/// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
403+
/// the data was read from a commit file (true) or a checkpoint file (false).
404+
///
405+
/// Also returns `CheckpointReadInfo` with stats_parsed compatibility and the checkpoint schema.
387406
#[internal_api]
388407
pub(crate) fn read_actions_with_projected_checkpoint_actions(
389408
&self,
390409
engine: &dyn Engine,
391410
commit_read_schema: SchemaRef,
392411
checkpoint_read_schema: SchemaRef,
393412
meta_predicate: Option<PredicateRef>,
394-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
413+
stats_schema: Option<&StructType>,
414+
) -> DeltaResult<(
415+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
416+
CheckpointReadInfo,
417+
)> {
395418
// `replay` expects commit files to be sorted in descending order, so the return value here is correct
396419
let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;
397420

398-
let checkpoint_stream =
399-
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;
421+
let (checkpoint_stream, checkpoint_info) = self.create_checkpoint_stream(
422+
engine,
423+
checkpoint_read_schema,
424+
meta_predicate,
425+
stats_schema,
426+
)?;
400427

401-
Ok(commit_stream.chain(checkpoint_stream))
428+
Ok((commit_stream.chain(checkpoint_stream), checkpoint_info))
402429
}
403430

404431
// Same as above, but uses the same schema for reading checkpoints and commits.
@@ -409,12 +436,15 @@ impl LogSegment {
409436
action_schema: SchemaRef,
410437
meta_predicate: Option<PredicateRef>,
411438
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
412-
self.read_actions_with_projected_checkpoint_actions(
413-
engine,
414-
action_schema.clone(),
415-
action_schema,
416-
meta_predicate,
417-
)
439+
let (actions_iter, _checkpoint_info) = self
440+
.read_actions_with_projected_checkpoint_actions(
441+
engine,
442+
action_schema.clone(),
443+
action_schema,
444+
meta_predicate,
445+
None,
446+
)?;
447+
Ok(actions_iter)
418448
}
419449

420450
/// find a minimal set to cover the range of commits we want. This is greedy so not always
@@ -514,12 +544,13 @@ impl LogSegment {
514544
Some(true) => {
515545
// Hint says V2 checkpoint, extract sidecars
516546
let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?;
517-
// For V2, read first sidecar's schema
547+
// For V2, read first sidecar's schema if sidecars exist,
548+
// otherwise fall back to hint schema (for empty V2 checkpoints)
518549
let file_actions_schema = match sidecar_files.first() {
519550
Some(first) => {
520551
Some(engine.parquet_handler().read_parquet_footer(first)?.schema)
521552
}
522-
None => None,
553+
None => hint_schema.cloned(),
523554
};
524555
Ok((file_actions_schema, sidecar_files))
525556
}
@@ -532,11 +563,13 @@ impl LogSegment {
532563
if footer.schema.field(SIDECAR_NAME).is_some() {
533564
// V2 parquet checkpoint
534565
let sidecar_files = self.extract_sidecar_refs(engine, checkpoint)?;
566+
// For V2, read first sidecar's schema if sidecars exist,
567+
// otherwise fall back to footer schema (for empty V2 checkpoints)
535568
let file_actions_schema = match sidecar_files.first() {
536569
Some(first) => Some(
537570
engine.parquet_handler().read_parquet_footer(first)?.schema,
538571
),
539-
None => None,
572+
None => Some(footer.schema),
540573
};
541574
Ok((file_actions_schema, sidecar_files))
542575
} else {
@@ -556,12 +589,18 @@ impl LogSegment {
556589
/// 1. Determines the files actions schema (for future stats_parsed detection)
557590
/// 2. Extracts sidecar file references if present (V2 checkpoints)
558591
/// 3. Reads checkpoint and sidecar data using cached sidecar refs
592+
///
593+
/// Returns a tuple of the actions iterator and [`CheckpointReadInfo`].
559594
fn create_checkpoint_stream(
560595
&self,
561596
engine: &dyn Engine,
562597
action_schema: SchemaRef,
563598
meta_predicate: Option<PredicateRef>,
564-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
599+
stats_schema: Option<&StructType>,
600+
) -> DeltaResult<(
601+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
602+
CheckpointReadInfo,
603+
)> {
565604
let need_file_actions = schema_contains_file_actions(&action_schema);
566605

567606
// Extract file actions schema and sidecar files
@@ -574,22 +613,61 @@ impl LogSegment {
574613
(None, vec![])
575614
};
576615

577-
// (Future) Determine if there are usable parsed stats
578-
// let _has_stats_parsed = file_actions_schema.as_ref()
579-
// .map(|s| Self::schema_has_compatible_stats_parsed(s, stats_schema))
580-
// .unwrap_or(false);
581-
let _ = file_actions_schema; // Suppress unused warning for now
582-
583-
// Read the actual checkpoint files, using cached sidecar files
584-
// We expand sidecars if we have them and need file actions
585-
let checkpoint_read_schema = if need_file_actions
586-
&& !sidecar_files.is_empty()
587-
&& !action_schema.contains(SIDECAR_NAME)
588-
{
589-
Arc::new(
590-
action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?,
616+
// Check if checkpoint has compatible stats_parsed and add it to the schema if so
617+
let has_stats_parsed =
618+
stats_schema
619+
.zip(file_actions_schema.as_ref())
620+
.is_some_and(|(stats, file_schema)| {
621+
Self::schema_has_compatible_stats_parsed(file_schema, stats)
622+
});
623+
624+
// Build final schema with any additional fields needed (stats_parsed, sidecar)
625+
// Only modify the schema if it has an "add" field (i.e., we need file actions)
626+
let final_schema = if let Some(add_field) = action_schema.field("add") {
627+
let DataType::Struct(add_struct) = add_field.data_type() else {
628+
return Err(Error::internal_error(
629+
"add field in action schema must be a struct",
630+
));
631+
};
632+
let mut add_fields: Vec<StructField> = add_struct.fields().cloned().collect();
633+
634+
// Add stats_parsed if checkpoint has compatible parsed stats
635+
if let (true, Some(stats_schema)) = (has_stats_parsed, stats_schema) {
636+
add_fields.push(StructField::nullable(
637+
"stats_parsed",
638+
DataType::Struct(Box::new(stats_schema.clone())),
639+
));
640+
}
641+
642+
// Rebuild the add field with any new fields (stats_parsed)
643+
let new_add_field = StructField::new(
644+
add_field.name(),
645+
StructType::new_unchecked(add_fields),
646+
add_field.is_nullable(),
591647
)
648+
.with_metadata(add_field.metadata.clone());
649+
650+
// Rebuild schema with modified add field
651+
let mut new_fields: Vec<StructField> = action_schema
652+
.fields()
653+
.map(|f| {
654+
if f.name() == "add" {
655+
new_add_field.clone()
656+
} else {
657+
f.clone()
658+
}
659+
})
660+
.collect();
661+
662+
// Add sidecar column at top-level for V2 checkpoints
663+
if need_file_actions && !sidecar_files.is_empty() {
664+
new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()));
665+
}
666+
667+
Arc::new(StructType::new_unchecked(new_fields))
592668
} else {
669+
// Schema doesn't have "add" field (e.g., for metadata/protocol only reads)
670+
// Use the action_schema as-is
593671
action_schema.clone()
594672
};
595673

@@ -609,14 +687,14 @@ impl LogSegment {
609687
Some(parsed_log_path) if parsed_log_path.extension == "json" => {
610688
engine.json_handler().read_json_files(
611689
&checkpoint_file_meta,
612-
checkpoint_read_schema.clone(),
690+
final_schema.clone(),
613691
meta_predicate.clone(),
614692
)?
615693
}
616694
Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler
617695
.read_parquet_files(
618696
&checkpoint_file_meta,
619-
checkpoint_read_schema.clone(),
697+
final_schema.clone(),
620698
meta_predicate.clone(),
621699
)?,
622700
Some(parsed_log_path) => {
@@ -644,7 +722,11 @@ impl LogSegment {
644722
.map_ok(|batch| ActionsBatch::new(batch, false))
645723
.chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false)));
646724

647-
Ok(actions_iter)
725+
let checkpoint_info = CheckpointReadInfo {
726+
has_stats_parsed,
727+
checkpoint_read_schema: final_schema,
728+
};
729+
Ok((actions_iter, checkpoint_info))
648730
}
649731

650732
/// Extracts sidecar file references from a checkpoint file.
@@ -804,8 +886,7 @@ impl LogSegment {
804886
/// use physical column names (not logical names), so direct name comparison is correct.
805887
///
806888
/// Returns `false` if stats_parsed doesn't exist or has incompatible types.
807-
#[allow(dead_code)]
808-
fn schema_has_compatible_stats_parsed(
889+
pub(crate) fn schema_has_compatible_stats_parsed(
809890
checkpoint_schema: &StructType,
810891
stats_schema: &StructType,
811892
) -> bool {
@@ -833,42 +914,53 @@ impl LogSegment {
833914
// While these typically have the same schema, the protocol doesn't guarantee it,
834915
// so we check both to be safe.
835916
for field_name in ["minValues", "maxValues"] {
836-
let Some(values_field) = stats_struct.field(field_name) else {
917+
let Some(checkpoint_values_field) = stats_struct.field(field_name) else {
837918
// stats_parsed exists but no minValues/maxValues - unusual but valid
838919
continue;
839920
};
840921

841922
// minValues/maxValues must be a Struct containing per-column statistics.
842923
// If it exists but isn't a Struct, the schema is malformed and unusable.
843-
let DataType::Struct(values_struct) = values_field.data_type() else {
924+
let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else {
844925
debug!(
845926
"stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}",
846927
field_name,
847-
values_field.data_type()
928+
checkpoint_values_field.data_type()
848929
);
849930
return false;
850931
};
851932

852-
// Check type compatibility for each column in the checkpoint's stats_parsed
853-
// that also exists in the stats schema (columns needed for data skipping)
854-
for checkpoint_field in values_struct.fields() {
855-
if let Some(stats_field) = stats_schema.field(&checkpoint_field.name) {
933+
// Get the corresponding field from stats_schema (e.g., stats_schema.minValues)
934+
let Some(stats_values_field) = stats_schema.field(field_name) else {
935+
// stats_schema doesn't have minValues/maxValues, skip this check
936+
continue;
937+
};
938+
let DataType::Struct(stats_values) = stats_values_field.data_type() else {
939+
// stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip
940+
continue;
941+
};
942+
943+
// Check type compatibility for each column needed for data skipping
944+
// If it exists in checkpoint, verify types are compatible
945+
for stats_field in stats_values.fields() {
946+
if let Some(checkpoint_field) = checkpoint_values.field(&stats_field.name) {
856947
if checkpoint_field
857948
.data_type()
858949
.can_read_as(stats_field.data_type())
859950
.is_err()
860951
{
861952
debug!(
862-
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema has {:?}",
863-
checkpoint_field.name,
953+
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema needs {:?}",
954+
stats_field.name,
864955
field_name,
865956
checkpoint_field.data_type(),
866957
stats_field.data_type()
867958
);
868959
return false;
869960
}
870961
}
871-
// If column doesn't exist in stats schema, it's fine (not needed for data skipping)
962+
// If the column is missing from checkpoint's stats_parsed, it will return
963+
// null when accessed, which is acceptable for data skipping.
872964
}
873965
}
874966

0 commit comments

Comments
 (0)