Skip to content

Commit

Permalink
Merge pull request #107 from nicklan/table-to-engine-client
Browse files Browse the repository at this point in the history
rename table client -> engine interface
  • Loading branch information
nicklan authored Jan 31, 2024
2 parents 575c7a5 + 47db6ec commit d8d3eb3
Show file tree
Hide file tree
Showing 16 changed files with 107 additions and 89 deletions.
2 changes: 1 addition & 1 deletion acceptance/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Helpers to validate implementaions of TableClients
//! Helpers to validate Engineinterface implementations
pub mod meta;
pub use meta::*;
13 changes: 8 additions & 5 deletions acceptance/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
use url::Url;

use deltakernel::snapshot::Snapshot;
use deltakernel::{Error, Table, TableClient, Version};
use deltakernel::{EngineInterface, Error, Table, Version};

#[derive(Debug, thiserror::Error)]
pub enum AssertionError {
Expand Down Expand Up @@ -96,17 +96,20 @@ impl TestCaseInfo {
Ok(())
}

pub async fn assert_metadata(&self, table_client: Arc<dyn TableClient>) -> TestResult<()> {
let table_client = table_client.as_ref();
pub async fn assert_metadata(
&self,
engine_interface: Arc<dyn EngineInterface>,
) -> TestResult<()> {
let engine_interface = engine_interface.as_ref();
let table = Table::new(self.table_root()?);

let (latest, versions) = self.versions().await?;

let snapshot = table.snapshot(table_client, None)?;
let snapshot = table.snapshot(engine_interface, None)?;
self.assert_snapshot_meta(&latest, &snapshot)?;

for table_version in versions {
let snapshot = table.snapshot(table_client, Some(table_version.version))?;
let snapshot = table.snapshot(engine_interface, Some(table_version.version))?;
self.assert_snapshot_meta(&table_version, &snapshot)?;
}

Expand Down
6 changes: 4 additions & 2 deletions acceptance/tests/dat_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
.block_on(async {
let case = read_dat_case(root_dir).unwrap();
let table_root = case.table_root().unwrap();
let table_client = Arc::new(
let engine_interface = Arc::new(
DefaultTableClient::try_new(
&table_root,
std::iter::empty::<(&str, &str)>(),
Expand All @@ -27,7 +27,9 @@ fn reader_test(path: &Path) -> datatest_stable::Result<()> {
.unwrap(),
);

case.assert_metadata(table_client.clone()).await.unwrap();
case.assert_metadata(engine_interface.clone())
.await
.unwrap();
});
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions acceptance/tests/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ async fn test_read_table_with_checkpoint() {
))
.unwrap();
let location = url::Url::from_directory_path(path).unwrap();
let table_client = Arc::new(
let engine_interface = Arc::new(
DefaultTableClient::try_new(&location, HashMap::<String, String>::new()).unwrap(),
);
let snapshot = Snapshot::try_new(location, table_client, None)
let snapshot = Snapshot::try_new(location, engine_interface, None)
.await
.unwrap();
Expand Down
10 changes: 5 additions & 5 deletions kernel/examples/dump-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ fn main() {
println!("Invalid url");
return;
};
let table_client = DefaultTableClient::try_new(
let engine_interface = DefaultTableClient::try_new(
&url,
HashMap::<String, String>::new(),
Arc::new(TokioBackgroundExecutor::new()),
);
let Ok(table_client) = table_client else {
let Ok(engine_interface) = engine_interface else {
println!(
"Failed to construct table client: {}",
table_client.err().unwrap()
engine_interface.err().unwrap()
);
return;
};

let table = Table::new(url);
let snapshot = table.snapshot(&table_client, None);
let snapshot = table.snapshot(&engine_interface, None);
let Ok(snapshot) = snapshot else {
println!(
"Failed to construct latest snapshot: {}",
Expand All @@ -127,7 +127,7 @@ fn main() {
}
table.set_header(header_names);

for batch in scan.execute(&table_client).unwrap() {
for batch in scan.execute(&engine_interface).unwrap() {
for row in 0..batch.num_rows() {
let table_row =
(0..batch.num_columns()).map(|col| extract_value(batch.column(col), row));
Expand Down
12 changes: 6 additions & 6 deletions kernel/examples/inspect-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ fn main() {
println!("Invalid url");
return;
};
let table_client = DefaultTableClient::try_new(
let engine_interface = DefaultTableClient::try_new(
&url,
HashMap::<String, String>::new(),
Arc::new(TokioBackgroundExecutor::new()),
);
let Ok(table_client) = table_client else {
let Ok(engine_interface) = engine_interface else {
println!(
"Failed to construct table client: {}",
table_client.err().unwrap()
engine_interface.err().unwrap()
);
return;
};

let table = Table::new(url);
let snapshot = table.snapshot(&table_client, None);
let snapshot = table.snapshot(&engine_interface, None);
let Ok(snapshot) = snapshot else {
println!(
"Failed to construct latest snapshot: {}",
Expand All @@ -91,7 +91,7 @@ fn main() {
use deltakernel::Add;
let scan = ScanBuilder::new(snapshot).build();
let files: Vec<Add> = scan
.files(&table_client)
.files(&engine_interface)
.unwrap()
.map(|r| r.unwrap())
.collect();
Expand All @@ -115,7 +115,7 @@ fn main() {

let batches = snapshot
._log_segment()
.replay(&table_client, read_schema, None);
.replay(&engine_interface, read_schema, None);

let batch_vec = batches
.unwrap()
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use futures::Future;

/// An executor that can be used to run async tasks. This is used by IO functions
/// within the default TableClient.
/// within the default Engineinterface.
///
/// This must be capable of running within an async context and running futures
/// on another thread. This could be a multi-threaded runtime, like Tokio's or
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! # Default TableClient
//! # Default Engineinterface
//!
//! The default implementation of [`TableClient`] is [`DefaultTableClient`].
//! The default implementation of [`Engineinterface`] is [`DefaultTableClient`].
//! This uses the [object_store], [parquet][::parquet], and [arrow_json] crates
//! to read and write data.
//!
Expand All @@ -19,7 +19,7 @@ use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use crate::{
DeltaResult, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, TableClient,
DeltaResult, EngineInterface, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler,
};

pub mod conversion;
Expand Down Expand Up @@ -93,7 +93,7 @@ impl<E: TaskExecutor> DefaultTableClient<E> {
}
}

impl<E: TaskExecutor> TableClient for DefaultTableClient<E> {
impl<E: TaskExecutor> EngineInterface for DefaultTableClient<E> {
fn get_expression_handler(&self) -> Arc<dyn ExpressionHandler> {
self.expression.clone()
}
Expand Down
8 changes: 4 additions & 4 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! # TableClient interfaces
//! # Engineinterface interfaces
//!
//! The TableClient interfaces allow connectors to bring their own implementation of functionality
//! The Engineinterface interfaces allow connectors to bring their own implementation of functionality
//! such as reading parquet files, listing files in a file system, parsing a JSON string etc.
//!
//! The [`TableClient`] trait exposes methods to get sub-clients which expose the core
//! The [`Engineinterface`] trait exposes methods to get sub-clients which expose the core
//! functionalities customizable by connectors.
//!
//! ## Expression handling
Expand Down Expand Up @@ -189,7 +189,7 @@ pub trait ParquetHandler: Send + Sync {
/// Interface encapsulating all clients needed by the Delta Kernel in order to read the Delta table.
///
/// Connectors are expected to pass an implementation of this interface when reading a Delta table.
pub trait TableClient {
pub trait EngineInterface {
/// Get the connector provided [`ExpressionHandler`].
fn get_expression_handler(&self) -> Arc<dyn ExpressionHandler>;

Expand Down
4 changes: 2 additions & 2 deletions kernel/src/scan/data_skipping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::debug;
use crate::error::{DeltaResult, Error};
use crate::expressions::{BinaryOperator, Expression as Expr, VariadicOperator};
use crate::schema::{SchemaRef, StructField, StructType};
use crate::{ExpressionEvaluator, TableClient};
use crate::{EngineInterface, ExpressionEvaluator};

/// Returns <op2> (if any) such that B <op2> A is equivalent to A <op> B.
fn commute(op: &BinaryOperator) -> Option<BinaryOperator> {
Expand Down Expand Up @@ -110,7 +110,7 @@ impl DataSkippingFilter {
/// NOTE: None is equivalent to a trivial filter that always returns TRUE (= keeps all files),
/// but using an Option lets the engine easily avoid the overhead of applying trivial filters.
pub(crate) fn new(
table_client: &dyn TableClient,
table_client: &dyn EngineInterface,
table_schema: &SchemaRef,
predicate: &Option<Expr>,
) -> Option<Self> {
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/scan/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::data_skipping::DataSkippingFilter;
use crate::actions::{parse_actions, Action, ActionType, Add};
use crate::expressions::Expression;
use crate::schema::SchemaRef;
use crate::{DeltaResult, TableClient};
use crate::{DeltaResult, EngineInterface};

struct LogReplayScanner {
filter: Option<DataSkippingFilter>,
Expand All @@ -22,7 +22,7 @@ struct LogReplayScanner {
impl LogReplayScanner {
/// Create a new [`LogReplayScanner`] instance
fn new(
table_client: &dyn TableClient,
table_client: &dyn EngineInterface,
table_schema: &SchemaRef,
predicate: &Option<Expression>,
) -> Self {
Expand Down Expand Up @@ -95,7 +95,7 @@ impl LogReplayScanner {
/// Given an iterator of (record batch, bool) tuples and a predicate, returns an iterator of [Add]s.
/// The boolean flag indicates whether the record batch is a log or checkpoint batch.
pub fn log_replay_iter(
table_client: &dyn TableClient,
table_client: &dyn EngineInterface,
action_iter: impl Iterator<Item = DeltaResult<(RecordBatch, bool)>>,
table_schema: &SchemaRef,
predicate: &Option<Expression>,
Expand Down
32 changes: 18 additions & 14 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::actions::ActionType;
use crate::expressions::Expression;
use crate::schema::{SchemaRef, StructType};
use crate::snapshot::Snapshot;
use crate::{Add, DeltaResult, FileMeta, TableClient};
use crate::{Add, DeltaResult, EngineInterface, FileMeta};

mod data_skipping;
pub mod file_stream;
Expand Down Expand Up @@ -117,7 +117,7 @@ impl Scan {
/// files into actual table data.
pub fn files(
&self,
table_client: &dyn TableClient,
engine_interface: &dyn EngineInterface,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Add>>> {
lazy_static::lazy_static! {
static ref ACTION_SCHEMA: SchemaRef = Arc::new(StructType::new(vec![
Expand All @@ -127,23 +127,23 @@ impl Scan {
}

let log_iter = self.snapshot.log_segment.replay(
table_client,
engine_interface,
ACTION_SCHEMA.clone(),
self.predicate.clone(),
)?;

Ok(log_replay_iter(
table_client,
engine_interface,
log_iter,
&self.read_schema,
&self.predicate,
))
}

pub fn execute(&self, table_client: &dyn TableClient) -> DeltaResult<Vec<RecordBatch>> {
let parquet_handler = table_client.get_parquet_handler();
pub fn execute(&self, engine_interface: &dyn EngineInterface) -> DeltaResult<Vec<RecordBatch>> {
let parquet_handler = engine_interface.get_parquet_handler();

self.files(table_client)?
self.files(engine_interface)?
.map(|res| {
let add = res?;
let meta = FileMeta {
Expand All @@ -163,7 +163,7 @@ impl Scan {
let batch = concat_batches(&schema, &batches)?;

if let Some(dv_descriptor) = add.deletion_vector {
let fs_client = table_client.get_file_system_client();
let fs_client = engine_interface.get_file_system_client();
let dv = dv_descriptor.read(fs_client, self.snapshot.table_root.clone())?;
let mask: BooleanArray = (0..batch.num_rows())
.map(|i| Some(!dv.contains(i.try_into().expect("fit into u32"))))
Expand Down Expand Up @@ -192,17 +192,21 @@ mod tests {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let table_client = DefaultTableClient::try_new(
let engine_interface = DefaultTableClient::try_new(
&url,
std::iter::empty::<(&str, &str)>(),
Arc::new(TokioBackgroundExecutor::new()),
)
.unwrap();

let table = Table::new(url);
let snapshot = table.snapshot(&table_client, None).unwrap();
let snapshot = table.snapshot(&engine_interface, None).unwrap();
let scan = ScanBuilder::new(snapshot).build();
let files: Vec<Add> = scan.files(&table_client).unwrap().try_collect().unwrap();
let files: Vec<Add> = scan
.files(&engine_interface)
.unwrap()
.try_collect()
.unwrap();

assert_eq!(files.len(), 1);
assert_eq!(
Expand All @@ -217,17 +221,17 @@ mod tests {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let table_client = DefaultTableClient::try_new(
let engine_interface = DefaultTableClient::try_new(
&url,
std::iter::empty::<(&str, &str)>(),
Arc::new(TokioBackgroundExecutor::new()),
)
.unwrap();

let table = Table::new(url);
let snapshot = table.snapshot(&table_client, None).unwrap();
let snapshot = table.snapshot(&engine_interface, None).unwrap();
let scan = ScanBuilder::new(snapshot).build();
let files = scan.execute(&table_client).unwrap();
let files = scan.execute(&engine_interface).unwrap();

assert_eq!(files.len(), 1);
assert_eq!(files[0].num_rows(), 10)
Expand Down
Loading

0 comments on commit d8d3eb3

Please sign in to comment.