Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ffi/src/engine_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn read_parquet_file_impl(
last_modified: file.last_modified,
size: file.size,
};
// TODO: Plumb the predicate through the FFI?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created #382

let data = parquet_handler.read_parquet_files(&[delta_fm], physical_schema, None)?;
let res = Box::new(FileReadResultIterator {
data,
Expand Down
18 changes: 14 additions & 4 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
_predicate: Option<Expression>,
predicate: Option<Expression>,
) -> DeltaResult<FileDataReadResultIterator> {
if files.is_empty() {
return Ok(Box::new(std::iter::empty()));
Expand All @@ -62,10 +62,15 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
// -> parse to parquet
// SAFETY: we did is_empty check above, this is ok.
let file_opener: Box<dyn FileOpener> = match files[0].location.scheme() {
"http" | "https" => Box::new(PresignedUrlOpener::new(1024, physical_schema.clone())),
"http" | "https" => Box::new(PresignedUrlOpener::new(
1024,
physical_schema.clone(),
predicate,
)),
_ => Box::new(ParquetOpener::new(
1024,
physical_schema.clone(),
predicate,
self.store.clone(),
)),
};
Expand All @@ -83,20 +88,23 @@ impl<E: TaskExecutor> ParquetHandler for DefaultParquetHandler<E> {
struct ParquetOpener {
// projection: Arc<[usize]>,
batch_size: usize,
limit: Option<usize>,
table_schema: SchemaRef,
_predicate: Option<Expression>,
limit: Option<usize>,
store: Arc<DynObjectStore>,
}

impl ParquetOpener {
pub(crate) fn new(
batch_size: usize,
table_schema: SchemaRef,
predicate: Option<Expression>,
store: Arc<DynObjectStore>,
) -> Self {
Self {
batch_size,
table_schema,
_predicate: predicate,
limit: None,
store,
}
Expand Down Expand Up @@ -153,16 +161,18 @@ impl FileOpener for ParquetOpener {
/// Implements [`FileOpener`] for a opening a parquet file from a presigned URL
struct PresignedUrlOpener {
batch_size: usize,
_predicate: Option<Expression>,
limit: Option<usize>,
table_schema: SchemaRef,
client: reqwest::Client,
}

impl PresignedUrlOpener {
pub(crate) fn new(batch_size: usize, schema: SchemaRef) -> Self {
pub(crate) fn new(batch_size: usize, schema: SchemaRef, predicate: Option<Expression>) -> Self {
Self {
batch_size,
table_schema: schema,
_predicate: predicate,
limit: None,
client: reqwest::Client::new(),
}
Expand Down
5 changes: 5 additions & 0 deletions kernel/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ impl Expression {
Self::unary(UnaryOperator::IsNull, self)
}

/// Create a new expression `self IS NOT NULL`
pub fn is_not_null(self) -> Self {
!Self::is_null(self)
}

/// Create a new expression `self == other`
pub fn eq(self, other: Self) -> Self {
Self::binary(BinaryOperator::Equal, self, other)
Expand Down
46 changes: 35 additions & 11 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,27 @@ impl Scan {
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanData>>> {
Ok(scan_action_iter(
engine,
self.replay_for_scan_data(engine)?,
&self.logical_schema,
&self.predicate,
))
}

// Factored out to facilitate testing
fn replay_for_scan_data(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;

let log_iter = self.snapshot.log_segment.replay(
self.snapshot.log_segment.replay(
engine,
commit_read_schema,
checkpoint_read_schema,
self.predicate.clone(),
)?;

Ok(scan_action_iter(
engine,
log_iter,
&self.logical_schema,
&self.predicate,
))
)
}

/// Get global state that is valid for the entire scan. This is somewhat expensive so should
Expand Down Expand Up @@ -312,7 +317,7 @@ impl Scan {
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[meta],
global_state.read_schema.clone(),
None,
self.predicate().clone(),
)?;
let gs = global_state.clone(); // Arc clone
Ok(read_result_iter.map(move |read_result| -> DeltaResult<_> {
Expand Down Expand Up @@ -714,6 +719,25 @@ mod tests {
}
}

#[test]
fn test_replay_for_scan_data() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let scan = snapshot.into_scan_builder().build().unwrap();
let data: Vec<_> = scan
.replay_for_scan_data(&engine)
.unwrap()
.try_collect()
.unwrap();
// No predicate pushdown attempted, because at most one part of a multi-part checkpoint
// could be skipped when looking for adds/removes.
assert_eq!(data.len(), 5);
}

#[test_log::test]
fn test_scan_with_checkpoint() -> DeltaResult<()> {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
53 changes: 43 additions & 10 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
//!

use std::cmp::Ordering;
use std::ops::Not;
use std::sync::Arc;

use itertools::Itertools;
Expand Down Expand Up @@ -71,15 +70,7 @@ impl LogSegment {
}

fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<Option<(Metadata, Protocol)>> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let filter = Some(Expr::or(
Expr::not(Expr::is_null(Expr::column("metaData.id"))),
Expr::not(Expr::is_null(Expr::column("protocol.minReaderVersion"))),
));
// read the same protocol and metadata schema for both commits and checkpoints
let data_batches = self.replay(engine, schema.clone(), schema, filter)?;
let data_batches = self.replay_for_metadata(engine)?;
let mut metadata_opt: Option<Metadata> = None;
let mut protocol_opt: Option<Protocol> = None;
for batch in data_batches {
Expand All @@ -102,6 +93,22 @@ impl LogSegment {
_ => Err(Error::MissingMetadataAndProtocol),
}
}

// Factored out to facilitate testing
fn replay_for_metadata(
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
// filter out log files that do not contain metadata or protocol information
use Expression as Expr;
let meta_predicate = Expr::or(
Expr::column("metaData.id").is_not_null(),
Expr::column("protocol.minReaderVersion").is_not_null(),
);
// read the same protocol and metadata schema for both commits and checkpoints
self.replay(engine, schema.clone(), schema, Some(meta_predicate))
}
}

// TODO expose methods for accessing the files of a table (with file pruning).
Expand Down Expand Up @@ -168,6 +175,10 @@ impl Snapshot {
if let Some(version) = version {
commit_files.retain(|log_path| log_path.version <= version);
}
// only keep commit files above the checkpoint we found
if let Some(checkpoint_file) = checkpoint_files.first() {
commit_files.retain(|log_path| checkpoint_file.version < log_path.version);
}

// get the effective version from chosen files
let version_eff = commit_files
Expand Down Expand Up @@ -445,6 +456,7 @@ mod tests {
use crate::engine::default::filesystem::ObjectStoreFileSystemClient;
use crate::engine::sync::SyncEngine;
use crate::schema::StructType;
use crate::Table;

#[test]
fn test_snapshot_read_metadata() {
Expand Down Expand Up @@ -616,6 +628,27 @@ mod tests {
assert!(invalid.is_none())
}

#[test]
fn test_replay_for_metadata() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let data: Vec<_> = snapshot
.log_segment
.replay_for_metadata(&engine)
.unwrap()
.try_collect()
.unwrap();
// The checkpoint has five parts, each containing one action. The P&M come from first and
// third parts, respectively. The actual `read_metadata` will also skip the last two parts
// because it terminates the iteration immediately after finding both P&M.
// TODO: Implement parquet row group skipping so we filter out all but two files.
assert_eq!(data.len(), 5);
}

#[test_log::test]
fn test_read_table_with_checkpoint() {
let path = std::fs::canonicalize(PathBuf::from(
Expand Down
57 changes: 44 additions & 13 deletions kernel/src/transaction.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::sync::Arc;

use crate::actions::visitors::TransactionVisitor;
use crate::actions::{get_log_schema, TRANSACTION_NAME};
use crate::actions::{get_log_schema, Transaction, TRANSACTION_NAME};
use crate::snapshot::Snapshot;
use crate::Engine;
use crate::{actions::Transaction, DeltaResult};
use crate::{DeltaResult, Engine, EngineData, SchemaRef};

pub use crate::actions::visitors::TransactionMap;
pub struct TransactionScanner {
Expand All @@ -22,17 +21,11 @@ impl TransactionScanner {
engine: &dyn Engine,
application_id: Option<&str>,
) -> DeltaResult<TransactionMap> {
let schema = get_log_schema().project(&[TRANSACTION_NAME])?;

let schema = Self::get_txn_schema()?;
let mut visitor = TransactionVisitor::new(application_id.map(|s| s.to_owned()));

// when all ids are requested then a full scan of the log to the latest checkpoint is required
let iter =
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema.clone(), None)?;

for maybe_data in iter {
// If a specific id is requested then we can terminate log replay early as soon as it was
// found. If all ids are requested then we are forced to replay the entire log.
for maybe_data in self.replay_for_app_ids(engine, schema.clone())? {
let (txns, _) = maybe_data?;
txns.extract(schema.clone(), &mut visitor)?;
// if a specific id is requested and a transaction was found, then return
Expand All @@ -44,6 +37,22 @@ impl TransactionScanner {
Ok(visitor.transactions)
}

// Factored out to facilitate testing
fn get_txn_schema() -> DeltaResult<SchemaRef> {
get_log_schema().project(&[TRANSACTION_NAME])
}

// Factored out to facilitate testing
fn replay_for_app_ids(
&self,
engine: &dyn Engine,
schema: SchemaRef,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
self.snapshot
.log_segment
.replay(engine, schema.clone(), schema, None)
}

/// Scan the Delta Log for the latest transaction entry of an application
pub fn application_transaction(
&self,
Expand All @@ -67,6 +76,7 @@ mod tests {
use super::*;
use crate::engine::sync::SyncEngine;
use crate::Table;
use itertools::Itertools;

fn get_latest_transactions(path: &str, app_id: &str) -> (TransactionMap, Option<Transaction>) {
let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
Expand Down Expand Up @@ -117,4 +127,25 @@ mod tests {
.as_ref()
);
}

#[test]
fn test_replay_for_app_ids() {
let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/"));
let url = url::Url::from_directory_path(path.unwrap()).unwrap();
let engine = SyncEngine::new();

let table = Table::new(url);
let snapshot = table.snapshot(&engine, None).unwrap();
let txn = TransactionScanner::new(snapshot.into());
let txn_schema = TransactionScanner::get_txn_schema().unwrap();

// The checkpoint has five parts, each containing one action. There are two app ids.
// TODO: Implement parquet row group skipping so we only read two files.
let data: Vec<_> = txn
.replay_for_app_ids(&engine, txn_schema.clone())
.unwrap()
.try_collect()
.unwrap();
assert_eq!(data.len(), 5);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1728065840472,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.checkpoint.writeStatsAsStruct\":\"false\",\"delta.dataSkippingNumIndexedCols\":\"0\",\"delta.checkpoint.writeStatsAsJson\":\"false\",\"delta.checkpointInterval\":\"1\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.2.1","txnId":"aef9df5a-e8a9-4d36-af75-2ffd4dc6b6cf"}}
{"metaData":{"id":"fd39678a-d482-4fe2-99d3-52732e7fbb09","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"bool\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"chrono\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"date32\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"timestamp_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"numeric\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"decimals\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"decimal128\",\"type\":\"decimal(32,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal32\",\"type\":\"decimal(8,3)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal64\",\"type\":\"decimal(16,3)\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"floats\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"float32\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float64\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"ints\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"int16\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int32\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int64\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int8\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"varlen\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"binary\",\"type\":\"binary\",\"nullable\":true,\"metadata\":{}},{\"name\":\"utf8\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"false","delta.dataSkippingNumIndexedCols":"0","delta.checkpoint.writeStatsAsJson":"false","delta.checkpointInterval":"1"},"createdTime":1728065840373}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz"],"writerFeatures":["timestampNtz"]}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1728065844007,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"4959"},"engineInfo":"Apache-Spark/3.5.3 Delta-Lake/3.2.1","txnId":"d46d4bca-ab50-4075-977f-80a5b3844afa"}}
{"add":{"path":"part-00000-b92e017a-50ba-4676-8322-48fc371c2b59-c000.snappy.parquet","partitionValues":{},"size":4959,"modificationTime":1728065843972,"dataChange":true,"stats":"{\"numRecords\":5}"}}
{"txn":{"appId":"3ae45b72-24e1-865a-a211-34987ae02f2a","version":4390}}
{"txn":{"appId":"b42b951f-f5d1-4f6e-be2a-0d11d1543029","version":1235}}
Loading
Loading