diff --git a/Cargo.toml b/Cargo.toml index c3e53c69af..3935efd8c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ debug = "line-tables-only" # "default-engine", # "developer-visibility", # ] } -delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "caeb70ab78e4d5f3b56b5105fd3587c1046d1e1b", features = [ +delta_kernel = { git = "https://github.com/roeap/delta-kernel-rs", rev = "023abf1ee604b77bbaa5efec97e043fc4bdf220b", features = [ "default-engine", "developer-visibility", ] } diff --git a/crates/core/src/kernel/snapshot_next/eager.rs b/crates/core/src/kernel/snapshot_next/eager.rs index 88306b8e49..a1b0c0d4ca 100644 --- a/crates/core/src/kernel/snapshot_next/eager.rs +++ b/crates/core/src/kernel/snapshot_next/eager.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use arrow::compute::{concat_batches, filter_record_batch}; use arrow_array::{BooleanArray, RecordBatch}; -use chrono::format::Item; use delta_kernel::actions::set_transaction::SetTransactionMap; use delta_kernel::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME}; use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; @@ -11,7 +10,7 @@ use delta_kernel::log_segment::LogSegment; use delta_kernel::scan::log_replay::scan_action_iter; use delta_kernel::schema::Schema; use delta_kernel::table_properties::TableProperties; -use delta_kernel::{EngineData, Expression, Table, Version}; +use delta_kernel::{Engine, EngineData, Expression, Table, Version}; use itertools::Itertools; use object_store::ObjectStore; use url::Url; @@ -19,6 +18,7 @@ use url::Url; use super::iterators::{AddIterator, AddView, AddViewItem}; use super::lazy::LazySnapshot; use super::{Snapshot, SnapshotError}; +use crate::kernel::CommitInfo; use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; /// An eager snapshot of a Delta Table at a specific version. @@ -77,6 +77,14 @@ impl Snapshot for EagerSnapshot { ) -> DeltaResult> { self.snapshot.application_transaction(app_id) } + + fn commit_infos( + &self, + start_version: impl Into>, + limit: impl Into>, + ) -> DeltaResult> { + self.snapshot.commit_infos(start_version, limit) + } } impl EagerSnapshot { @@ -92,7 +100,7 @@ impl EagerSnapshot { LazySnapshot::try_new(Table::try_from_uri(table_root)?, store, version).await?; let files = config .require_files - .then(|| -> DeltaResult<_> { Ok(replay_file_actions(&snapshot)?) }) + .then(|| -> DeltaResult<_> { replay_file_actions(&snapshot) }) .transpose()?; Ok(Self { snapshot, @@ -101,6 +109,10 @@ impl EagerSnapshot { }) } + pub(crate) fn engine_ref(&self) -> &Arc { + self.snapshot.engine_ref() + } + pub fn file_data(&self) -> DeltaResult<&RecordBatch> { Ok(self .files @@ -122,7 +134,7 @@ impl EagerSnapshot { .files .as_ref() .map(|f| f.num_rows()) - .ok_or_else(|| SnapshotError::FilesNotInitialized)?) + .ok_or(SnapshotError::FilesNotInitialized)?) } pub(crate) fn update(&mut self) -> DeltaResult<()> { diff --git a/crates/core/src/kernel/snapshot_next/iterators.rs b/crates/core/src/kernel/snapshot_next/iterators.rs index 4700cb9da3..375fc0061e 100644 --- a/crates/core/src/kernel/snapshot_next/iterators.rs +++ b/crates/core/src/kernel/snapshot_next/iterators.rs @@ -25,7 +25,7 @@ pub struct AddIterator<'a> { } impl AddIterator<'_> { - pub fn try_new<'a>(actions: &'a RecordBatch) -> DeltaResult> { + pub fn try_new(actions: &RecordBatch) -> DeltaResult> { validate_column::(actions, &[ADD_NAME, "path"])?; validate_column::(actions, &[ADD_NAME, "size"])?; validate_column::(actions, &[ADD_NAME, "modificationTime"])?; @@ -108,7 +108,7 @@ pub struct AddViewItem { } impl AddViewItem { - pub fn path(&self) -> &str { + pub fn path(&self) -> &str { extract_column(&self.actions, &[ADD_NAME, "path"]) .unwrap() .as_string::() @@ -273,7 +273,7 @@ fn validate_column<'a, T: Array + 'static>( } } else { return Err(DeltaTableError::from( - crate::protocol::ProtocolError::InvalidField(format!("Column not found",)), + crate::protocol::ProtocolError::InvalidField("Column not found".to_string()), )); } Ok(()) diff --git a/crates/core/src/kernel/snapshot_next/lazy.rs b/crates/core/src/kernel/snapshot_next/lazy.rs index 386d0f63d6..2125b70d93 100644 --- a/crates/core/src/kernel/snapshot_next/lazy.rs +++ b/crates/core/src/kernel/snapshot_next/lazy.rs @@ -1,5 +1,6 @@ //! Snapshot of a Delta Table at a specific version. //! +use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; use arrow::compute::filter_record_batch; @@ -12,6 +13,7 @@ use delta_kernel::engine::default::executor::tokio::{ TokioBackgroundExecutor, TokioMultiThreadExecutor, }; use delta_kernel::engine::default::DefaultEngine; +use delta_kernel::log_segment::LogSegment; use delta_kernel::schema::Schema; use delta_kernel::snapshot::Snapshot as SnapshotInner; use delta_kernel::table_properties::TableProperties; @@ -23,6 +25,7 @@ use url::Url; use super::cache::CommitCacheObjectStore; use super::Snapshot; +use crate::kernel::{Action, CommitInfo}; use crate::{DeltaResult, DeltaTableError}; // TODO: avoid repetitive parsing of json stats @@ -35,7 +38,7 @@ pub struct LazySnapshot { impl Snapshot for LazySnapshot { fn table_root(&self) -> &Url { - &self.inner.table_root() + self.inner.table_root() } fn version(&self) -> Version { @@ -55,7 +58,7 @@ impl Snapshot for LazySnapshot { } fn table_properties(&self) -> &TableProperties { - &self.inner.table_properties() + self.inner.table_properties() } fn files(&self) -> DeltaResult>> { @@ -96,6 +99,58 @@ impl Snapshot for LazySnapshot { let scanner = SetTransactionScanner::new(self.inner.clone()); Ok(scanner.application_transaction(self.engine.as_ref(), app_id.as_ref())?) } + + fn commit_infos( + &self, + start_version: impl Into>, + limit: impl Into>, + ) -> DeltaResult> { + // let start_version = start_version.into(); + let fs_client = self.engine.get_file_system_client(); + let end_version = start_version.into().unwrap_or_else(|| self.version()); + let start_version = limit + .into() + .and_then(|limit| { + if limit == 0 { + Some(end_version) + } else { + Some(end_version.saturating_sub(limit as u64 - 1)) + } + }) + .unwrap_or(0); + let log_root = self.inner.table_root().join("_delta_log").unwrap(); + let mut log_segment = LogSegment::for_table_changes( + fs_client.as_ref(), + log_root, + start_version, + end_version, + )?; + log_segment.ascending_commit_files.reverse(); + let files = log_segment + .ascending_commit_files + .iter() + .map(|commit_file| (commit_file.location.location.clone(), None)) + .collect_vec(); + + Ok(fs_client + .read_files(files)? + .zip(log_segment.ascending_commit_files.into_iter()) + .filter_map(|(data, path)| { + data.ok().and_then(|d| { + let reader = BufReader::new(Cursor::new(d)); + for line in reader.lines() { + match line.and_then(|l| Ok(serde_json::from_str::(&l)?)) { + Ok(Action::CommitInfo(commit_info)) => { + return Some((path.version, commit_info)) + } + Err(e) => return None, + _ => continue, + }; + } + None + }) + })) + } } impl LazySnapshot { @@ -138,7 +193,7 @@ impl LazySnapshot { } /// A shared reference to the engine used for interacting with the Delta Table. - pub(super) fn engine_ref(&self) -> &Arc { + pub(crate) fn engine_ref(&self) -> &Arc { &self.engine } diff --git a/crates/core/src/kernel/snapshot_next/mod.rs b/crates/core/src/kernel/snapshot_next/mod.rs index 879ef2824a..b02367c3d0 100644 --- a/crates/core/src/kernel/snapshot_next/mod.rs +++ b/crates/core/src/kernel/snapshot_next/mod.rs @@ -4,14 +4,15 @@ use std::sync::Arc; use arrow_array::RecordBatch; use delta_kernel::actions::visitors::SetTransactionMap; -use delta_kernel::actions::{Add, Metadata, Protocol, SetTransaction}; +use delta_kernel::actions::{Metadata, Protocol, SetTransaction}; use delta_kernel::expressions::{Scalar, StructData}; use delta_kernel::schema::Schema; use delta_kernel::table_properties::TableProperties; use delta_kernel::Version; -use iterators::{AddIterator, AddView, AddViewItem}; +use iterators::{AddView, AddViewItem}; use url::Url; +use crate::kernel::actions::CommitInfo; use crate::{DeltaResult, DeltaTableError}; pub use eager::EagerSnapshot; @@ -77,7 +78,7 @@ pub trait Snapshot { fn files_view( &self, ) -> DeltaResult>>> { - Ok(self.files()?.map(|r| r.and_then(|b| AddView::try_new(b)))) + Ok(self.files()?.map(|r| r.and_then(AddView::try_new))) } fn tombstones(&self) -> DeltaResult>>; @@ -93,10 +94,40 @@ pub trait Snapshot { /// /// Initiates a log scan, but terminates as soon as the transaction /// for the given application is found. + /// + /// # Parameters + /// - `app_id`: The application id for which to fetch the transaction. + /// + /// # Returns + /// The latest transaction for the given application id, if it exists. fn application_transaction( &self, app_id: impl AsRef, ) -> DeltaResult>; + + /// Get commit info for the table. + /// + /// The [`CommitInfo`]s are returned in descending order of version + /// with the most recent commit first starting from the `start_version`. + /// + /// [`CommitInfo`]s are read on a best-effort basis. If the action + /// for a version is not available or cannot be parsed, it is skipped. + /// + /// # Parameters + /// - `start_version`: The version from which to start fetching commit info. + /// Defaults to the latest version. + /// - `limit`: The maximum number of commit infos to fetch. + /// + /// # Returns + /// An iterator of commit info tuples. The first element of the tuple is the version + /// of the commit, the second element is the corresponding commit info. + // TODO(roeap): this is currently using our commit info, we should be using + // the definition form kernel, once handling over there matured. + fn commit_infos( + &self, + start_version: impl Into>, + limit: impl Into>, + ) -> DeltaResult>; } impl Snapshot for Arc { @@ -142,6 +173,67 @@ impl Snapshot for Arc { ) -> DeltaResult> { self.as_ref().application_transaction(app_id) } + + fn commit_infos( + &self, + start_version: impl Into>, + limit: impl Into>, + ) -> DeltaResult> { + self.as_ref().commit_infos(start_version, limit) + } +} + +impl Snapshot for Box { + fn table_root(&self) -> &Url { + self.as_ref().table_root() + } + + fn version(&self) -> Version { + self.as_ref().version() + } + + fn schema(&self) -> &Schema { + self.as_ref().schema() + } + + fn metadata(&self) -> &Metadata { + self.as_ref().metadata() + } + + fn protocol(&self) -> &Protocol { + self.as_ref().protocol() + } + + fn table_properties(&self) -> &TableProperties { + self.as_ref().table_properties() + } + + fn files(&self) -> DeltaResult>> { + self.as_ref().files() + } + + fn tombstones(&self) -> DeltaResult>> { + self.as_ref().tombstones() + } + + fn application_transactions(&self) -> DeltaResult { + self.as_ref().application_transactions() + } + + fn application_transaction( + &self, + app_id: impl AsRef, + ) -> DeltaResult> { + self.as_ref().application_transaction(app_id) + } + + fn commit_infos( + &self, + start_version: impl Into>, + limit: impl Into>, + ) -> DeltaResult> { + self.as_ref().commit_infos(start_version, limit) + } } #[cfg(test)]