Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 151 additions & 42 deletions kernel/src/log_segment.rs

Large diffs are not rendered by default.

78 changes: 56 additions & 22 deletions kernel/src/log_segment/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1170,8 +1170,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schem
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1235,8 +1240,13 @@ async fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert the correctness of batches returned
for expected_sidecar in ["sidecar1.parquet", "sidecar2.parquet"].iter() {
Expand Down Expand Up @@ -1295,8 +1305,13 @@ async fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_si
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1344,8 +1359,9 @@ async fn test_create_checkpoint_stream_reads_json_checkpoint_batch_without_sidec
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None)?;
let checkpoint_result =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema, None, None)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -1431,8 +1447,13 @@ async fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar
None,
None,
)?;
let mut iter =
log_segment.create_checkpoint_stream(&engine, v2_checkpoint_read_schema.clone(), None)?;
let checkpoint_result = log_segment.create_checkpoint_stream(
&engine,
v2_checkpoint_read_schema.clone(),
None,
None,
)?;
let mut iter = checkpoint_result.actions;

// Assert that the first batch returned is from reading checkpoint file 1
let ActionsBatch {
Expand Down Expand Up @@ -2709,6 +2730,18 @@ fn create_checkpoint_schema_with_stats_parsed(min_values_fields: Vec<StructField
StructType::new_unchecked([StructField::nullable("add", add_schema)])
}

// Helper to create a stats_schema with proper structure (numRecords, minValues, maxValues)
fn create_stats_schema(column_fields: Vec<StructField>) -> StructType {
StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable(
"minValues",
StructType::new_unchecked(column_fields.clone()),
),
StructField::nullable("maxValues", StructType::new_unchecked(column_fields)),
])
}

// Helper to create a checkpoint schema without stats_parsed
fn create_checkpoint_schema_without_stats_parsed() -> StructType {
use crate::schema::StructType;
Expand All @@ -2731,15 +2764,15 @@ fn test_schema_has_compatible_stats_parsed_basic() {
)]);

// Exact type match should work
let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
));

// Type widening (int -> long) should work
let stats_schema_widened =
StructType::new_unchecked([StructField::nullable("id", DataType::LONG)]);
create_stats_schema(vec![StructField::nullable("id", DataType::LONG)]);
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema_widened
Expand All @@ -2759,17 +2792,18 @@ fn test_schema_has_compatible_stats_parsed_basic() {

#[test]
fn test_schema_has_compatible_stats_parsed_missing_column_ok() {
// Checkpoint has "id" column, stats schema needs "other" column (missing in checkpoint is OK)
// Checkpoint has "id" column, stats schema needs "other" column
// Missing column is acceptable - it will return null when accessed
let checkpoint_schema =
create_checkpoint_schema_with_stats_parsed(vec![StructField::nullable(
"id",
DataType::INTEGER,
)]);

let stats_schema =
StructType::new_unchecked([StructField::nullable("other", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("other", DataType::INTEGER)]);

// Missing column in checkpoint is OK - it will just return NULL
// Missing column in checkpoint is OK - it will return null when accessed,
// which is acceptable for data skipping (just means we can't skip based on that column)
assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
&stats_schema
Expand All @@ -2784,7 +2818,7 @@ fn test_schema_has_compatible_stats_parsed_extra_column_ok() {
StructField::nullable("extra", DataType::STRING),
]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

assert!(LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
Expand All @@ -2797,7 +2831,7 @@ fn test_schema_has_compatible_stats_parsed_no_stats_parsed() {
// Checkpoint schema without stats_parsed field
let checkpoint_schema = create_checkpoint_schema_without_stats_parsed();

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

assert!(!LogSegment::schema_has_compatible_stats_parsed(
&checkpoint_schema,
Expand All @@ -2814,7 +2848,7 @@ fn test_schema_has_compatible_stats_parsed_empty_stats_schema() {
DataType::INTEGER,
)]);

let stats_schema = StructType::new_unchecked([]);
let stats_schema = create_stats_schema(vec![]);

// If no columns are needed for data skipping, any stats_parsed is compatible
assert!(LogSegment::schema_has_compatible_stats_parsed(
Expand All @@ -2832,7 +2866,7 @@ fn test_schema_has_compatible_stats_parsed_multiple_columns() {
]);

// First column matches, second is incompatible
let stats_schema = StructType::new_unchecked([
let stats_schema = create_stats_schema(vec![
StructField::nullable("good_col", DataType::LONG),
StructField::nullable("bad_col", DataType::INTEGER),
]);
Expand All @@ -2858,7 +2892,7 @@ fn test_schema_has_compatible_stats_parsed_missing_min_max_values() {

let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

// Should return true - missing minValues/maxValues is handled gracefully with continue
assert!(LogSegment::schema_has_compatible_stats_parsed(
Expand All @@ -2884,7 +2918,7 @@ fn test_schema_has_compatible_stats_parsed_min_values_not_struct() {

let checkpoint_schema = StructType::new_unchecked([StructField::nullable("add", add_schema)]);

let stats_schema = StructType::new_unchecked([StructField::nullable("id", DataType::INTEGER)]);
let stats_schema = create_stats_schema(vec![StructField::nullable("id", DataType::INTEGER)]);

// Should return false - minValues/maxValues must be Struct types
assert!(!LogSegment::schema_has_compatible_stats_parsed(
Expand Down
27 changes: 25 additions & 2 deletions kernel/src/scan/data_skipping/stats_schema.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! This module contains logic to compute the expected schema for file statistics

use std::borrow::Cow;
use std::sync::Arc;

use crate::{
schema::{
ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaTransform,
StructField, StructType,
ArrayType, ColumnName, DataType, MapType, PrimitiveType, Schema, SchemaRef,
SchemaTransform, StructField, StructType,
},
table_properties::{DataSkippingNumIndexedCols, TableProperties},
DeltaResult,
Expand Down Expand Up @@ -133,6 +134,28 @@ pub(crate) fn expected_stats_schema(
StructType::try_new(fields)
}

/// Creates a stats schema from a referenced schema (columns from predicate).
/// Returns schema: `{ numRecords, nullCount, minValues, maxValues }`
///
/// This is used to build the schema for parsing JSON stats and for reading stats_parsed
/// from checkpoints.
pub(crate) fn build_stats_schema(referenced_schema: &StructType) -> Option<SchemaRef> {
let stats_schema = NullableStatsTransform
.transform_struct(referenced_schema)?
.into_owned();

let nullcount_schema = NullCountStatsTransform
.transform_struct(&stats_schema)?
.into_owned();

Some(Arc::new(StructType::new_unchecked([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", nullcount_schema),
StructField::nullable("minValues", stats_schema.clone()),
StructField::nullable("maxValues", stats_schema),
])))
}

/// Transforms a schema to make all fields nullable.
/// Used for stats schemas where stats may not be available for all columns.
pub(crate) struct NullableStatsTransform;
Expand Down
11 changes: 10 additions & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::log_replay::deduplicator::Deduplicator;
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
use crate::scan::Scalar;
use crate::schema::ToSchema as _;
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType};
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec};
use crate::utils::require;
Expand All @@ -32,6 +32,7 @@ struct InternalScanState {
predicate_schema: Option<Arc<StructType>>,
transform_spec: Option<Arc<TransformSpec>>,
column_mapping_mode: ColumnMappingMode,
stats_schema: Option<SchemaRef>,
}

/// Public-facing serialized processor state for distributed processing.
Expand Down Expand Up @@ -162,6 +163,7 @@ impl ScanLogReplayProcessor {
physical_predicate,
transform_spec,
column_mapping_mode,
stats_schema,
} = self.state_info.as_ref().clone();

// Extract predicate from PhysicalPredicate
Expand All @@ -177,6 +179,7 @@ impl ScanLogReplayProcessor {
transform_spec,
predicate_schema,
column_mapping_mode,
stats_schema,
};
let internal_state_blob = serde_json::to_vec(&internal_state)
.map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?;
Expand Down Expand Up @@ -232,6 +235,7 @@ impl ScanLogReplayProcessor {
physical_predicate,
transform_spec: internal_state.transform_spec,
column_mapping_mode: internal_state.column_mapping_mode,
stats_schema: internal_state.stats_schema,
});

let processor = Self::new_with_seen_files(engine, state_info, state.seen_file_keys)?;
Expand Down Expand Up @@ -634,6 +638,7 @@ mod tests {
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
stats_schema: None,
});
let iter = scan_action_iter(
&SyncEngine::new(),
Expand Down Expand Up @@ -926,6 +931,7 @@ mod tests {
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: mode,
stats_schema: None,
});
let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap();
let deserialized = ScanLogReplayProcessor::from_serializable_state(
Expand All @@ -952,6 +958,7 @@ mod tests {
physical_predicate: PhysicalPredicate::None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
stats_schema: None,
});
let processor = ScanLogReplayProcessor::new(&engine, state_info).unwrap();
let serialized = processor.into_serializable_state().unwrap();
Expand Down Expand Up @@ -989,6 +996,7 @@ mod tests {
predicate_schema: None, // Missing!
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
stats_schema: None,
};
let predicate = Arc::new(crate::expressions::Predicate::column(["id"]));
let invalid_blob = serde_json::to_vec(&invalid_internal_state).unwrap();
Expand Down Expand Up @@ -1017,6 +1025,7 @@ mod tests {
predicate_schema: None,
transform_spec: None,
column_mapping_mode: ColumnMappingMode::None,
stats_schema: None,
};
let blob = serde_json::to_string(&invalid_internal_state).unwrap();
let mut obj: serde_json::Value = serde_json::from_str(&blob).unwrap();
Expand Down
26 changes: 16 additions & 10 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::expressions::{ColumnName, ExpressionRef, Predicate, PredicateRef, Sca
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResolver};
use crate::listed_log_files::ListedLogFilesBuilder;
use crate::log_replay::{ActionsBatch, HasSelectionVector};
use crate::log_segment::LogSegment;
use crate::log_segment::{ActionsWithCheckpointInfo, LogSegment};
use crate::scan::log_replay::{BASE_ROW_ID_NAME, CLUSTERING_PROVIDER_NAME};
use crate::scan::state_info::StateInfo;
use crate::schema::{
Expand Down Expand Up @@ -435,7 +435,8 @@ impl Scan {
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadata>>> {
self.scan_metadata_inner(engine, self.replay_for_scan_metadata(engine)?)
let result = self.replay_for_scan_metadata(engine)?;
self.scan_metadata_inner(engine, result.actions)
}

/// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s.
Expand Down Expand Up @@ -544,13 +545,15 @@ impl Scan {
None, // No checkpoint in this incremental segment
)?;

let it = new_log_segment
.read_actions_with_projected_checkpoint_actions(
engine,
COMMIT_READ_SCHEMA.clone(),
CHECKPOINT_READ_SCHEMA.clone(),
None,
)?
let result = new_log_segment.read_actions_with_projected_checkpoint_actions(
engine,
COMMIT_READ_SCHEMA.clone(),
CHECKPOINT_READ_SCHEMA.clone(),
None,
None,
)?;
let it = result
.actions
.chain(existing_data.into_iter().map(apply_transform));

Ok(Box::new(self.scan_metadata_inner(engine, it)?))
Expand All @@ -572,7 +575,9 @@ impl Scan {
fn replay_for_scan_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
) -> DeltaResult<
ActionsWithCheckpointInfo<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send>,
> {
// NOTE: We don't pass any meta-predicate because we expect no meaningful row group skipping
// when ~every checkpoint file will contain the adds and removes we are looking for.
self.snapshot
Expand All @@ -582,6 +587,7 @@ impl Scan {
COMMIT_READ_SCHEMA.clone(),
CHECKPOINT_READ_SCHEMA.clone(),
None,
self.state_info.stats_schema.as_ref().map(|s| s.as_ref()),
)
}

Expand Down
Loading
Loading