Skip to content

Commit a73cb03

Browse files
committed
read_stats
1 parent 318087c commit a73cb03

File tree

9 files changed

+235
-74
lines changed

9 files changed

+235
-74
lines changed

kernel/src/log_segment.rs

Lines changed: 127 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,18 @@ use url::Url;
3636
#[cfg(test)]
3737
mod tests;
3838

39+
/// Information about checkpoint reading for data skipping optimization.
40+
///
41+
/// Returned alongside the actions iterator from checkpoint reading functions.
42+
#[derive(Debug, Clone)]
43+
pub struct CheckpointReadInfo {
44+
/// Whether the checkpoint has compatible pre-parsed stats for data skipping.
45+
/// When `true`, checkpoint batches can use stats_parsed directly instead of parsing JSON.
46+
pub has_stats_parsed: bool,
47+
/// The schema used to read checkpoint files, potentially including stats_parsed.
48+
pub checkpoint_read_schema: SchemaRef,
49+
}
50+
3951
/// A [`LogSegment`] represents a contiguous section of the log and is made of checkpoint files
4052
/// and commit files and guarantees the following:
4153
/// 1. Commit file versions will not have any gaps between them.
@@ -384,21 +396,34 @@ impl LogSegment {
384396
///
385397
/// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the
386398
/// query's predicate, but rather a predicate for filtering log files themselves.
399+
/// Read a stream of actions from this log segment. This returns an iterator of
400+
/// [`ActionsBatch`]s which includes EngineData of actions + a boolean flag indicating whether
401+
/// the data was read from a commit file (true) or a checkpoint file (false).
402+
///
403+
/// Also returns [`CheckpointReadInfo`] with stats_parsed compatibility and the checkpoint schema.
387404
#[internal_api]
388405
pub(crate) fn read_actions_with_projected_checkpoint_actions(
389406
&self,
390407
engine: &dyn Engine,
391408
commit_read_schema: SchemaRef,
392409
checkpoint_read_schema: SchemaRef,
393410
meta_predicate: Option<PredicateRef>,
394-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
411+
stats_schema: Option<&StructType>,
412+
) -> DeltaResult<(
413+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
414+
CheckpointReadInfo,
415+
)> {
395416
// `replay` expects commit files to be sorted in descending order, so the return value here is correct
396417
let commit_stream = CommitReader::try_new(engine, self, commit_read_schema)?;
397418

398-
let checkpoint_stream =
399-
self.create_checkpoint_stream(engine, checkpoint_read_schema, meta_predicate)?;
419+
let (checkpoint_stream, checkpoint_info) = self.create_checkpoint_stream(
420+
engine,
421+
checkpoint_read_schema,
422+
meta_predicate,
423+
stats_schema,
424+
)?;
400425

401-
Ok(commit_stream.chain(checkpoint_stream))
426+
Ok((commit_stream.chain(checkpoint_stream), checkpoint_info))
402427
}
403428

404429
// Same as above, but uses the same schema for reading checkpoints and commits.
@@ -409,12 +434,15 @@ impl LogSegment {
409434
action_schema: SchemaRef,
410435
meta_predicate: Option<PredicateRef>,
411436
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
412-
self.read_actions_with_projected_checkpoint_actions(
413-
engine,
414-
action_schema.clone(),
415-
action_schema,
416-
meta_predicate,
417-
)
437+
let (actions_iter, _checkpoint_info) = self
438+
.read_actions_with_projected_checkpoint_actions(
439+
engine,
440+
action_schema.clone(),
441+
action_schema,
442+
meta_predicate,
443+
None,
444+
)?;
445+
Ok(actions_iter)
418446
}
419447

420448
/// find a minimal set to cover the range of commits we want. This is greedy so not always
@@ -556,12 +584,18 @@ impl LogSegment {
556584
/// 1. Determines the files actions schema (for future stats_parsed detection)
557585
/// 2. Extracts sidecar file references if present (V2 checkpoints)
558586
/// 3. Reads checkpoint and sidecar data using cached sidecar refs
587+
///
588+
/// Returns a tuple of the actions iterator and [`CheckpointReadInfo`].
559589
fn create_checkpoint_stream(
560590
&self,
561591
engine: &dyn Engine,
562592
action_schema: SchemaRef,
563593
meta_predicate: Option<PredicateRef>,
564-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
594+
stats_schema: Option<&StructType>,
595+
) -> DeltaResult<(
596+
impl Iterator<Item = DeltaResult<ActionsBatch>> + Send,
597+
CheckpointReadInfo,
598+
)> {
565599
let need_file_actions = schema_contains_file_actions(&action_schema);
566600

567601
// Extract file actions schema and sidecar files
@@ -574,22 +608,61 @@ impl LogSegment {
574608
(None, vec![])
575609
};
576610

577-
// (Future) Determine if there are usable parsed stats
578-
// let _has_stats_parsed = file_actions_schema.as_ref()
579-
// .map(|s| Self::schema_has_compatible_stats_parsed(s, stats_schema))
580-
// .unwrap_or(false);
581-
let _ = file_actions_schema; // Suppress unused warning for now
582-
583-
// Read the actual checkpoint files, using cached sidecar files
584-
// We expand sidecars if we have them and need file actions
585-
let checkpoint_read_schema = if need_file_actions
586-
&& !sidecar_files.is_empty()
587-
&& !action_schema.contains(SIDECAR_NAME)
588-
{
589-
Arc::new(
590-
action_schema.add([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())])?,
611+
// Check if checkpoint has compatible stats_parsed and add it to the schema if so
612+
let has_stats_parsed =
613+
stats_schema
614+
.zip(file_actions_schema.as_ref())
615+
.is_some_and(|(stats, file_schema)| {
616+
Self::schema_has_compatible_stats_parsed(file_schema, stats)
617+
});
618+
619+
// Build final schema with any additional fields needed (stats_parsed, sidecar)
620+
// Only modify the schema if it has an "add" field (i.e., we need file actions)
621+
let final_schema = if let Some(add_field) = action_schema.field("add") {
622+
let DataType::Struct(add_struct) = add_field.data_type() else {
623+
return Err(Error::internal_error(
624+
"add field in action schema must be a struct",
625+
));
626+
};
627+
let mut add_fields: Vec<StructField> = add_struct.fields().cloned().collect();
628+
629+
// Add stats_parsed if checkpoint has compatible parsed stats
630+
if let (true, Some(stats_schema)) = (has_stats_parsed, stats_schema) {
631+
add_fields.push(StructField::nullable(
632+
"stats_parsed",
633+
DataType::Struct(Box::new(stats_schema.clone())),
634+
));
635+
}
636+
637+
// Rebuild the add field with any new fields (stats_parsed)
638+
let new_add_field = StructField::new(
639+
add_field.name(),
640+
StructType::new_unchecked(add_fields),
641+
add_field.is_nullable(),
591642
)
643+
.with_metadata(add_field.metadata.clone());
644+
645+
// Rebuild schema with modified add field
646+
let mut new_fields: Vec<StructField> = action_schema
647+
.fields()
648+
.map(|f| {
649+
if f.name() == "add" {
650+
new_add_field.clone()
651+
} else {
652+
f.clone()
653+
}
654+
})
655+
.collect();
656+
657+
// Add sidecar column at top-level for V2 checkpoints
658+
if need_file_actions && !sidecar_files.is_empty() {
659+
new_fields.push(StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()));
660+
}
661+
662+
Arc::new(StructType::new_unchecked(new_fields))
592663
} else {
664+
// Schema doesn't have "add" field (e.g., for metadata/protocol only reads)
665+
// Use the action_schema as-is
593666
action_schema.clone()
594667
};
595668

@@ -609,14 +682,14 @@ impl LogSegment {
609682
Some(parsed_log_path) if parsed_log_path.extension == "json" => {
610683
engine.json_handler().read_json_files(
611684
&checkpoint_file_meta,
612-
checkpoint_read_schema.clone(),
685+
final_schema.clone(),
613686
meta_predicate.clone(),
614687
)?
615688
}
616689
Some(parsed_log_path) if parsed_log_path.extension == "parquet" => parquet_handler
617690
.read_parquet_files(
618691
&checkpoint_file_meta,
619-
checkpoint_read_schema.clone(),
692+
final_schema.clone(),
620693
meta_predicate.clone(),
621694
)?,
622695
Some(parsed_log_path) => {
@@ -644,7 +717,11 @@ impl LogSegment {
644717
.map_ok(|batch| ActionsBatch::new(batch, false))
645718
.chain(sidecar_batches.map_ok(|batch| ActionsBatch::new(batch, false)));
646719

647-
Ok(actions_iter)
720+
let checkpoint_info = CheckpointReadInfo {
721+
has_stats_parsed,
722+
checkpoint_read_schema: final_schema,
723+
};
724+
Ok((actions_iter, checkpoint_info))
648725
}
649726

650727
/// Extracts sidecar file references from a checkpoint file.
@@ -804,8 +881,7 @@ impl LogSegment {
804881
/// use physical column names (not logical names), so direct name comparison is correct.
805882
///
806883
/// Returns `false` if stats_parsed doesn't exist or has incompatible types.
807-
#[allow(dead_code)]
808-
fn schema_has_compatible_stats_parsed(
884+
pub(crate) fn schema_has_compatible_stats_parsed(
809885
checkpoint_schema: &StructType,
810886
stats_schema: &StructType,
811887
) -> bool {
@@ -833,42 +909,53 @@ impl LogSegment {
833909
// While these typically have the same schema, the protocol doesn't guarantee it,
834910
// so we check both to be safe.
835911
for field_name in ["minValues", "maxValues"] {
836-
let Some(values_field) = stats_struct.field(field_name) else {
912+
let Some(checkpoint_values_field) = stats_struct.field(field_name) else {
837913
// stats_parsed exists but no minValues/maxValues - unusual but valid
838914
continue;
839915
};
840916

841917
// minValues/maxValues must be a Struct containing per-column statistics.
842918
// If it exists but isn't a Struct, the schema is malformed and unusable.
843-
let DataType::Struct(values_struct) = values_field.data_type() else {
919+
let DataType::Struct(checkpoint_values) = checkpoint_values_field.data_type() else {
844920
debug!(
845921
"stats_parsed not compatible: stats_parsed.{} is not a Struct, got {:?}",
846922
field_name,
847-
values_field.data_type()
923+
checkpoint_values_field.data_type()
848924
);
849925
return false;
850926
};
851927

852-
// Check type compatibility for each column in the checkpoint's stats_parsed
853-
// that also exists in the stats schema (columns needed for data skipping)
854-
for checkpoint_field in values_struct.fields() {
855-
if let Some(stats_field) = stats_schema.field(&checkpoint_field.name) {
928+
// Get the corresponding field from stats_schema (e.g., stats_schema.minValues)
929+
let Some(stats_values_field) = stats_schema.field(field_name) else {
930+
// stats_schema doesn't have minValues/maxValues, skip this check
931+
continue;
932+
};
933+
let DataType::Struct(stats_values) = stats_values_field.data_type() else {
934+
// stats_schema.minValues/maxValues isn't a struct - shouldn't happen but skip
935+
continue;
936+
};
937+
938+
// Check type compatibility for each column needed for data skipping
939+
// If it exists in checkpoint, verify types are compatible
940+
for stats_field in stats_values.fields() {
941+
if let Some(checkpoint_field) = checkpoint_values.field(&stats_field.name) {
856942
if checkpoint_field
857943
.data_type()
858944
.can_read_as(stats_field.data_type())
859945
.is_err()
860946
{
861947
debug!(
862-
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema has {:?}",
863-
checkpoint_field.name,
948+
"stats_parsed not compatible: incompatible type for column '{}' in {}: checkpoint has {:?}, stats schema needs {:?}",
949+
stats_field.name,
864950
field_name,
865951
checkpoint_field.data_type(),
866952
stats_field.data_type()
867953
);
868954
return false;
869955
}
870956
}
871-
// If column doesn't exist in stats schema, it's fine (not needed for data skipping)
957+
// If the column is missing from checkpoint's stats_parsed, it will return
958+
// null when accessed, which is acceptable for data skipping.
872959
}
873960
}
874961

0 commit comments

Comments
 (0)