diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 855f8ccbe..4561a7887 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -41,9 +41,9 @@ impl JsonHandler for SyncJsonHandler { &self, files: &[FileMeta], schema: SchemaRef, - _predicate: Option, + predicate: Option, ) -> DeltaResult { - debug!("Reading json files: {:#?}", files); + debug!("Reading json files: {files:#?} with predicate {predicate:#?}"); if files.is_empty() { return Ok(Box::new(std::iter::empty())); } diff --git a/kernel/src/engine/sync/parquet.rs b/kernel/src/engine/sync/parquet.rs index cd25a926a..860a490e1 100644 --- a/kernel/src/engine/sync/parquet.rs +++ b/kernel/src/engine/sync/parquet.rs @@ -38,9 +38,9 @@ impl ParquetHandler for SyncParquetHandler { &self, files: &[FileMeta], schema: SchemaRef, - _predicate: Option, + predicate: Option, ) -> DeltaResult { - debug!("Reading parquet files: {files:#?} with schema {schema:#?}"); + debug!("Reading parquet files: {files:#?} with schema {schema:#?} and predicate {predicate:#?}"); if files.is_empty() { return Ok(Box::new(std::iter::empty())); } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 0318a560f..33ccaf4ac 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -627,6 +627,10 @@ mod tests { #[test] fn test_scan_data() { + use tracing_subscriber::EnvFilter; + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); let path = std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap(); let url = url::Url::from_directory_path(path).unwrap(); diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index ed225ae70..7fa55ce3f 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -3,6 +3,7 @@ //! use std::cmp::Ordering; +use std::ops::Not; use std::sync::Arc; use itertools::Itertools; @@ -71,9 +72,14 @@ impl LogSegment { fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult> { let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?; + // filter out log files that do not contain metadata or protocol information + use Expression as Expr; + let filter = Some(Expr::or( + Expr::not(Expr::is_null(Expr::column("metaData.id"))), + Expr::not(Expr::is_null(Expr::column("protocol.min_reader_version"))), + )); // read the same protocol and metadata schema for both commits and checkpoints - // TODO add metadata.table_id is not null and protocol.something_required is not null - let data_batches = self.replay(engine, schema.clone(), schema, None)?; + let data_batches = self.replay(engine, schema.clone(), schema, filter)?; let mut metadata_opt: Option = None; let mut protocol_opt: Option = None; for batch in data_batches {