From d8329ae9dbc10bc47931712ac5e941d2b3f9abe8 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 7 Nov 2024 16:28:45 -0800 Subject: [PATCH] [write] Transaction append data API (#393) This PR is the second (of two) major pieces for supporting simple blind appends. It implements: 1. **new `Transaction` APIs** for appending data to delta tables: a. `get_write_context()` to get a `WriteContext` to pass to the data path which includes all information needed to write: `target directory`, `snapshot schema`, `transformation expression`, and (future: columns to collect stats on) b. `add_write_metadata(impl EngineData)` to add metadata about a write to the transaction along with a new static method `transaction::get_write_metadata_schema` to provide the expected schema of this engine data. c. new machinery in 'commit' method to commit new `Add` actions for each row of write_metadata from the API above. 2. **new default engine capabilities** for using the default engine to write parquet data (to append to tables): a. parquet handler can now `write_parquet_file(EngineData)` b. usage example in `write.rs` tests for now 3. **new append tests** in the `write.rs` integration test suite Details and some follow-ups: - the parquet writing (similar to JSON) currently just buffers everything into memory before issuing one big PUT. we should make this smarter: single PUT for small data and MultipartUpload for larger data. tracking in #418 - schema enforcement is done at the data layer. this means it is up to the engine to call the expression evaluation and we expect this to fail if the output schema is incorrect (see `test_append_invalid_schema` in `write.rs` integration test). we may want to change this in the future to eagerly error based on the engine providing a schema up front at metadata time (transaction creation time) based on #370 resolves #390 --- kernel/Cargo.toml | 2 + kernel/src/engine/arrow_utils.rs | 6 +- kernel/src/engine/default/json.rs | 2 +- kernel/src/engine/default/mod.rs | 33 +- kernel/src/engine/default/parquet.rs | 272 +++++++++++++++- kernel/src/engine/sync/json.rs | 6 +- kernel/src/lib.rs | 15 +- kernel/src/transaction.rs | 155 ++++++++- kernel/tests/common/mod.rs | 52 ++- kernel/tests/read.rs | 20 +- kernel/tests/write.rs | 464 +++++++++++++++++++++++++-- 11 files changed, 955 insertions(+), 72 deletions(-) diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 8205a2ccf..235df2646 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -90,6 +90,8 @@ default-engine = [ "parquet/object_store", "reqwest", "tokio", + "uuid/v4", + "uuid/fast-rng", ] developer-visibility = [] diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index d8daba774..dea2cc9fd 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR /// serialize an arrow RecordBatch to a JSON string by appending to a buffer. // TODO (zach): this should stream data to the JSON writer and output an iterator. pub(crate) fn to_json_bytes( - data: impl Iterator> + Send, + data: impl Iterator>> + Send, ) -> DeltaResult> { let mut writer = LineDelimitedWriter::new(Vec::new()); for chunk in data.into_iter() { - let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?; + let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?; let record_batch = arrow_data.record_batch(); writer.write(record_batch)?; } @@ -1436,7 +1436,7 @@ mod tests { vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], )?; let data: Box = Box::new(ArrowEngineData::new(data)); - let json = to_json_bytes(Box::new(std::iter::once(data)))?; + let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?; assert_eq!( json, "{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes() diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index b03b26bc6..1d8aa3058 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -96,7 +96,7 @@ impl JsonHandler for DefaultJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let buffer = to_json_bytes(data)?; diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 9fa1bdb0c..d89cf29cd 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -6,6 +6,7 @@ //! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in //! the [executor] module. +use std::collections::HashMap; use std::sync::Arc; use self::storage::parse_url_opts; @@ -16,9 +17,13 @@ use self::executor::TaskExecutor; use self::filesystem::ObjectStoreFileSystemClient; use self::json::DefaultJsonHandler; use self::parquet::DefaultParquetHandler; +use super::arrow_data::ArrowEngineData; use super::arrow_expression::ArrowExpressionHandler; +use crate::schema::Schema; +use crate::transaction::WriteContext; use crate::{ - DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, + DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler, + ParquetHandler, }; pub mod executor; @@ -108,6 +113,32 @@ impl DefaultEngine { pub fn get_object_store_for_url(&self, _url: &Url) -> Option> { Some(self.store.clone()) } + + pub async fn write_parquet( + &self, + data: &ArrowEngineData, + write_context: &WriteContext, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let transform = write_context.logical_to_physical(); + let input_schema: Schema = data.record_batch().schema().try_into()?; + let output_schema = write_context.schema(); + let logical_to_physical_expr = self.get_expression_handler().get_evaluator( + input_schema.into(), + transform.clone(), + output_schema.clone().into(), + ); + let physical_data = logical_to_physical_expr.evaluate(data)?; + self.parquet + .write_parquet_file( + write_context.target_dir(), + physical_data, + partition_values, + data_change, + ) + .await + } } impl Engine for DefaultEngine { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index ffcf4e2e9..d4235fa07 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -1,23 +1,30 @@ //! Default Parquet handler implementation +use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; +use arrow_array::builder::{MapBuilder, MapFieldNames, StringBuilder}; +use arrow_array::{BooleanArray, Int64Array, RecordBatch, StringArray}; use futures::StreamExt; use object_store::path::Path; use object_store::DynObjectStore; use parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, }; +use parquet::arrow::arrow_writer::ArrowWriter; use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; +use uuid::Uuid; use super::file_stream::{FileOpenFuture, FileOpener, FileStream}; +use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_utils::{generate_mask, get_requested_indices, reorder_struct_array}; use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; use crate::{ - DeltaResult, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, ParquetHandler, + DeltaResult, EngineData, Error, ExpressionRef, FileDataReadResultIterator, FileMeta, + ParquetHandler, }; #[derive(Debug)] @@ -27,6 +34,64 @@ pub struct DefaultParquetHandler { readahead: usize, } +/// Metadata of a data file (typically a parquet file), currently just includes the file metadata +/// but will expand to include file statistics and other metadata in the future. +#[derive(Debug)] +pub struct DataFileMetadata { + file_meta: FileMeta, +} + +impl DataFileMetadata { + pub fn new(file_meta: FileMeta) -> Self { + Self { file_meta } + } + + // convert DataFileMetadata into a record batch which matches the 'write_metadata' schema + fn as_record_batch( + &self, + partition_values: &HashMap, + data_change: bool, + ) -> DeltaResult> { + let DataFileMetadata { + file_meta: + FileMeta { + location, + last_modified, + size, + }, + } = self; + let write_metadata_schema = crate::transaction::get_write_metadata_schema(); + + // create the record batch of the write metadata + let path = Arc::new(StringArray::from(vec![location.to_string()])); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let names = MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }; + let mut builder = MapBuilder::new(Some(names), key_builder, val_builder); + for (k, v) in partition_values { + builder.keys().append_value(k); + builder.values().append_value(v); + } + builder.append(true).unwrap(); + let partitions = Arc::new(builder.finish()); + // this means max size we can write is i64::MAX (~8EB) + let size: i64 = (*size) + .try_into() + .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; + let size = Arc::new(Int64Array::from(vec![size])); + let data_change = Arc::new(BooleanArray::from(vec![data_change])); + let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); + Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( + Arc::new(write_metadata_schema.as_ref().try_into()?), + vec![path, partitions, size, modification_time, data_change], + )?))) + } +} + impl DefaultParquetHandler { pub fn new(store: Arc, task_executor: Arc) -> Self { Self { @@ -43,6 +108,68 @@ impl DefaultParquetHandler { self.readahead = readahead; self } + + // Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + // metadata (where is a generated UUIDv4). + // + // Note: after encoding the data as parquet, this issues a PUT followed by a HEAD to storage in + // order to obtain metadata about the object just written. + async fn write_parquet( + &self, + path: &url::Url, + data: Box, + ) -> DeltaResult { + let batch: Box<_> = ArrowEngineData::try_from_engine_data(data)?; + let record_batch = batch.record_batch(); + + let mut buffer = vec![]; + let mut writer = ArrowWriter::try_new(&mut buffer, record_batch.schema(), None)?; + writer.write(record_batch)?; + writer.close()?; // writer must be closed to write footer + + let size = buffer.len(); + let name: String = format!("{}.parquet", Uuid::new_v4()); + // fail if path does not end with a trailing slash + if !path.path().ends_with('/') { + return Err(Error::generic(format!( + "Path must end with a trailing slash: {}", + path + ))); + } + let path = path.join(&name)?; + + self.store + .put(&Path::from(path.path()), buffer.into()) + .await?; + + let metadata = self.store.head(&Path::from(path.path())).await?; + let modification_time = metadata.last_modified.timestamp_millis(); + if size != metadata.size { + return Err(Error::generic(format!( + "Size mismatch after writing parquet file: expected {}, got {}", + size, metadata.size + ))); + } + + let file_meta = FileMeta::new(path, modification_time, size); + Ok(DataFileMetadata::new(file_meta)) + } + + /// Write `data` to `path`/.parquet as parquet using ArrowWriter and return the parquet + /// metadata as an EngineData batch which matches the [write metadata] schema (where is + /// a generated UUIDv4). + /// + /// [write metadata]: crate::transaction::get_write_metadata_schema + pub async fn write_parquet_file( + &self, + path: &url::Url, + data: Box, + partition_values: HashMap, + data_change: bool, + ) -> DeltaResult> { + let parquet_metadata = self.write_parquet(path, data).await?; + parquet_metadata.as_record_batch(&partition_values, data_change) + } } impl ParquetHandler for DefaultParquetHandler { @@ -242,9 +369,12 @@ impl FileOpener for PresignedUrlOpener { #[cfg(test)] mod tests { use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + use arrow_array::array::Array; use arrow_array::RecordBatch; - use object_store::{local::LocalFileSystem, ObjectStore}; + use object_store::{local::LocalFileSystem, memory::InMemory, ObjectStore}; + use url::Url; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -297,4 +427,142 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 10); } + + #[test] + fn test_as_record_batch() { + let location = Url::parse("file:///test_url").unwrap(); + let size = 1_000_000; + let last_modified = 10000000000; + let file_metadata = FileMeta::new(location.clone(), last_modified, size as usize); + let data_file_metadata = DataFileMetadata::new(file_metadata); + let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); + let data_change = true; + let actual = data_file_metadata + .as_record_batch(&partition_values, data_change) + .unwrap(); + let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); + + let schema = Arc::new( + crate::transaction::get_write_metadata_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let key_builder = StringBuilder::new(); + let val_builder = StringBuilder::new(); + let mut partition_values_builder = MapBuilder::new( + Some(MapFieldNames { + entry: "key_value".to_string(), + key: "key".to_string(), + value: "value".to_string(), + }), + key_builder, + val_builder, + ); + partition_values_builder.keys().append_value("partition1"); + partition_values_builder.values().append_value("a"); + partition_values_builder.append(true).unwrap(); + let partition_values = partition_values_builder.finish(); + let expected = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec![location.to_string()])), + Arc::new(partition_values), + Arc::new(Int64Array::from(vec![size])), + Arc::new(Int64Array::from(vec![last_modified])), + Arc::new(BooleanArray::from(vec![data_change])), + ], + ) + .unwrap(); + + assert_eq!(actual.record_batch(), &expected); + } + + #[tokio::test] + async fn test_write_parquet() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + let write_metadata = parquet_handler + .write_parquet(&Url::parse("memory:///data/").unwrap(), data) + .await + .unwrap(); + + let DataFileMetadata { + file_meta: + ref parquet_file @ FileMeta { + ref location, + last_modified, + size, + }, + } = write_metadata; + let expected_location = Url::parse("memory:///data/").unwrap(); + let expected_size = 497; + + // check that last_modified is within 10s of now + let now: i64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + .try_into() + .unwrap(); + + let filename = location.path().split('/').last().unwrap(); + assert_eq!(&expected_location.join(filename).unwrap(), location); + assert_eq!(expected_size, size); + assert!(now - last_modified < 10_000); + + // check we can read back + let path = Path::from(location.path()); + let meta = store.head(&path).await.unwrap(); + let reader = ParquetObjectReader::new(store.clone(), meta.clone()); + let physical_schema = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .schema() + .clone(); + + let data: Vec = parquet_handler + .read_parquet_files( + &[parquet_file.clone()], + Arc::new(physical_schema.try_into().unwrap()), + None, + ) + .unwrap() + .map(into_record_batch) + .try_collect() + .unwrap(); + + assert_eq!(data.len(), 1); + assert_eq!(data[0].num_rows(), 3); + } + + #[tokio::test] + async fn test_disallow_non_trailing_slash() { + let store = Arc::new(InMemory::new()); + let parquet_handler = + DefaultParquetHandler::new(store.clone(), Arc::new(TokioBackgroundExecutor::new())); + + let data = Box::new(ArrowEngineData::new( + RecordBatch::try_from_iter(vec![( + "a", + Arc::new(Int64Array::from(vec![1, 2, 3])) as Arc, + )]) + .unwrap(), + )); + + assert!(parquet_handler + .write_parquet(&Url::parse("memory:///data").unwrap(), data) + .await + .is_err()); + } } diff --git a/kernel/src/engine/sync/json.rs b/kernel/src/engine/sync/json.rs index 016fb2658..3d33b1025 100644 --- a/kernel/src/engine/sync/json.rs +++ b/kernel/src/engine/sync/json.rs @@ -52,7 +52,7 @@ impl JsonHandler for SyncJsonHandler { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, _overwrite: bool, ) -> DeltaResult<()> { let path = path @@ -120,10 +120,10 @@ mod tests { let url = Url::from_file_path(path.clone()).unwrap(); handler - .write_json_file(&url, Box::new(std::iter::once(data)), false) + .write_json_file(&url, Box::new(std::iter::once(Ok(data))), false) .expect("write json file"); assert!(matches!( - handler.write_json_file(&url, Box::new(std::iter::once(empty)), false), + handler.write_json_file(&url, Box::new(std::iter::once(Ok(empty))), false), Err(Error::FileAlreadyExists(_)) )); diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 2f686a3ad..40fa360f5 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -107,7 +107,7 @@ pub type FileDataReadResultIterator = pub struct FileMeta { /// The fully qualified path to the object pub location: Url, - /// The last modified time + /// The last modified time as milliseconds since unix epoch pub last_modified: i64, /// The size in bytes of the object pub size: usize, @@ -125,6 +125,17 @@ impl PartialOrd for FileMeta { } } +impl FileMeta { + /// Create a new instance of `FileMeta` + pub fn new(location: Url, last_modified: i64, size: usize) -> Self { + Self { + location, + last_modified, + size, + } + } +} + /// Trait for implementing an Expression evaluator. /// /// It contains one Expression which can be evaluated on multiple ColumnarBatches. @@ -234,7 +245,7 @@ pub trait JsonHandler: Send + Sync { fn write_json_file( &self, path: &Url, - data: Box> + Send>, + data: Box>> + Send + '_>, overwrite: bool, ) -> DeltaResult<()>; } diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 81b0f31f8..db6ba0e44 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -1,19 +1,41 @@ +use std::collections::HashMap; use std::iter; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::actions::get_log_commit_info_schema; +use crate::actions::schemas::{GetNullableContainerStructField, GetStructField}; use crate::actions::COMMIT_INFO_NAME; +use crate::actions::{get_log_add_schema, get_log_commit_info_schema}; use crate::error::Error; -use crate::expressions::{column_expr, Scalar, StructData}; +use crate::expressions::{column_expr, ColumnName, Scalar, StructData}; use crate::path::ParsedLogPath; -use crate::schema::{StructField, StructType}; +use crate::schema::{SchemaRef, StructField, StructType}; use crate::snapshot::Snapshot; use crate::{DataType, DeltaResult, Engine, EngineData, Expression, Version}; +use itertools::chain; +use url::Url; + const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); const UNKNOWN_OPERATION: &str = "UNKNOWN"; +pub(crate) static WRITE_METADATA_SCHEMA: LazyLock = LazyLock::new(|| { + Arc::new(StructType::new(vec![ + ::get_struct_field("path"), + >::get_nullable_container_struct_field("partitionValues"), + ::get_struct_field("size"), + ::get_struct_field("modificationTime"), + ::get_struct_field("dataChange"), + ])) +}); + +/// Get the expected schema for [`write_metadata`]. +/// +/// [`write_metadata`]: crate::transaction::Transaction::write_metadata +pub fn get_write_metadata_schema() -> &'static SchemaRef { + &WRITE_METADATA_SCHEMA +} + /// A transaction represents an in-progress write to a table. After creating a transaction, changes /// to the table may be staged via the transaction methods before calling `commit` to commit the /// changes to the table. @@ -32,6 +54,7 @@ pub struct Transaction { read_snapshot: Arc, operation: Option, commit_info: Option>, + write_metadata: Vec>, } impl std::fmt::Debug for Transaction { @@ -56,6 +79,7 @@ impl Transaction { read_snapshot: snapshot.into(), operation: None, commit_info: None, + write_metadata: vec![], } } @@ -63,16 +87,17 @@ impl Transaction { /// will include the failed transaction in case of a conflict so the user can retry. pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // step one: construct the iterator of actions we want to commit - // note: only support commit_info right now (and it's required) let engine_commit_info = self .commit_info .as_ref() .ok_or_else(|| Error::MissingCommitInfo)?; - let actions = Box::new(iter::once(generate_commit_info( + let commit_info = generate_commit_info( engine, self.operation.as_deref(), engine_commit_info.as_ref(), - )?)); + ); + let adds = generate_adds(engine, self.write_metadata.iter().map(|a| a.as_ref())); + let actions = chain(iter::once(commit_info), adds); // step two: set new commit version (current_version + 1) and path to write let commit_version = self.read_snapshot.version() + 1; @@ -112,6 +137,104 @@ impl Transaction { self.commit_info = Some(commit_info.into()); self } + + // Generate the logical-to-physical transform expression which must be evaluated on every data + // chunk before writing. At the moment, this is a transaction-wide expression. + fn generate_logical_to_physical(&self) -> Expression { + // for now, we just pass through all the columns except partition columns. + // note this is _incorrect_ if table config deems we need partition columns. + let partition_columns = self.read_snapshot.metadata().partition_columns.clone(); + let fields = self.read_snapshot.schema().fields(); + let fields = fields.filter_map(|f| { + if partition_columns.contains(f.name()) { + None + } else { + Some(ColumnName::new([f.name()]).into()) + } + }); + Expression::struct_from(fields) + } + + /// Get the write context for this transaction. At the moment, this is constant for the whole + /// transaction. + // Note: after we introduce metadata updates (modify table schema, etc.), we need to make sure + // that engines cannot call this method after a metadata change, since the write context could + // have invalid metadata. + pub fn get_write_context(&self) -> WriteContext { + let target_dir = self.read_snapshot.table_root(); + let snapshot_schema = self.read_snapshot.schema(); + let logical_to_physical = self.generate_logical_to_physical(); + WriteContext::new( + target_dir.clone(), + Arc::new(snapshot_schema.clone()), + logical_to_physical, + ) + } + + /// Add write metadata about files to include in the transaction. This API can be called + /// multiple times to add multiple batches. + /// + /// The expected schema for `write_metadata` is given by [`get_write_metadata_schema`]. + pub fn add_write_metadata(&mut self, write_metadata: Box) { + self.write_metadata.push(write_metadata); + } +} + +// convert write_metadata into add actions using an expression to transform the data in a single +// pass +fn generate_adds<'a>( + engine: &dyn Engine, + write_metadata: impl Iterator + Send + 'a, +) -> impl Iterator>> + Send + 'a { + let expression_handler = engine.get_expression_handler(); + let write_metadata_schema = get_write_metadata_schema(); + let log_schema = get_log_add_schema(); + + write_metadata.map(move |write_metadata_batch| { + let adds_expr = Expression::struct_from([Expression::struct_from( + write_metadata_schema + .fields() + .map(|f| ColumnName::new([f.name()]).into()), + )]); + let adds_evaluator = expression_handler.get_evaluator( + write_metadata_schema.clone(), + adds_expr, + log_schema.clone().into(), + ); + adds_evaluator.evaluate(write_metadata_batch) + }) +} + +/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to +/// write table data. +/// +/// [`Transaction`]: struct.Transaction.html +pub struct WriteContext { + target_dir: Url, + schema: SchemaRef, + logical_to_physical: Expression, +} + +impl WriteContext { + fn new(target_dir: Url, schema: SchemaRef, logical_to_physical: Expression) -> Self { + WriteContext { + target_dir, + schema, + logical_to_physical, + } + } + + pub fn target_dir(&self) -> &Url { + &self.target_dir + } + + pub fn schema(&self) -> &SchemaRef { + &self.schema + } + + pub fn logical_to_physical(&self) -> &Expression { + &self.logical_to_physical + } } /// Result after committing a transaction. If 'committed', the version is the new version written @@ -208,6 +331,7 @@ mod tests { use crate::engine::arrow_data::ArrowEngineData; use crate::engine::arrow_expression::ArrowExpressionHandler; + use crate::schema::MapType; use crate::{ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler}; use arrow::json::writer::LineDelimitedWriter; @@ -541,4 +665,21 @@ mod tests { } Ok(()) } + + #[test] + fn test_write_metadata_schema() { + let schema = get_write_metadata_schema(); + let expected = StructType::new(vec![ + StructField::new("path", DataType::STRING, false), + StructField::new( + "partitionValues", + MapType::new(DataType::STRING, DataType::STRING, true), + false, + ), + StructField::new("size", DataType::LONG, false), + StructField::new("modificationTime", DataType::LONG, false), + StructField::new("dataChange", DataType::BOOLEAN, false), + ]); + assert_eq!(*schema, expected.into()); + } } diff --git a/kernel/tests/common/mod.rs b/kernel/tests/common/mod.rs index c43a7df5e..c219efd61 100644 --- a/kernel/tests/common/mod.rs +++ b/kernel/tests/common/mod.rs @@ -1,7 +1,11 @@ -use crate::ArrowEngineData; +use arrow::compute::filter_record_batch; use arrow::record_batch::RecordBatch; -use delta_kernel::DeltaResult; -use delta_kernel::EngineData; +use arrow::util::pretty::pretty_format_batches; +use itertools::Itertools; + +use crate::ArrowEngineData; +use delta_kernel::scan::Scan; +use delta_kernel::{DeltaResult, Engine, EngineData, Table}; pub(crate) fn to_arrow(data: Box) -> DeltaResult { Ok(data @@ -10,3 +14,45 @@ pub(crate) fn to_arrow(data: Box) -> DeltaResult { .map_err(|_| delta_kernel::Error::EngineDataType("ArrowEngineData".to_string()))? .into()) } + +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] +pub(crate) fn test_read( + expected: &ArrowEngineData, + table: &Table, + engine: &impl Engine, +) -> Result<(), Box> { + let snapshot = table.snapshot(engine, None)?; + let scan = snapshot.into_scan_builder().build()?; + let batches = read_scan(&scan, engine)?; + let formatted = pretty_format_batches(&batches).unwrap().to_string(); + + let expected = pretty_format_batches(&[expected.record_batch().clone()]) + .unwrap() + .to_string(); + + println!("actual:\n{formatted}"); + println!("expected:\n{expected}"); + assert_eq!(formatted, expected); + + Ok(()) +} + +// TODO (zach): this is listed as unused for acceptance crate +#[allow(unused)] +pub(crate) fn read_scan(scan: &Scan, engine: &dyn Engine) -> DeltaResult> { + let scan_results = scan.execute(engine)?; + scan_results + .map(|scan_result| -> DeltaResult<_> { + let scan_result = scan_result?; + let mask = scan_result.full_mask(); + let data = scan_result.raw_data?; + let record_batch = to_arrow(data)?; + if let Some(mask) = mask { + Ok(filter_record_batch(&record_batch, &mask.into())?) + } else { + Ok(record_batch) + } + }) + .try_collect() +} diff --git a/kernel/tests/read.rs b/kernel/tests/read.rs index 4d2c977e0..259b7c457 100644 --- a/kernel/tests/read.rs +++ b/kernel/tests/read.rs @@ -17,15 +17,14 @@ use delta_kernel::expressions::{column_expr, BinaryOperator, Expression}; use delta_kernel::scan::state::{visit_scan_files, DvInfo, Stats}; use delta_kernel::scan::{transform_to_logical, Scan}; use delta_kernel::schema::Schema; -use delta_kernel::{DeltaResult, Engine, EngineData, FileMeta, Table}; -use itertools::Itertools; +use delta_kernel::{Engine, EngineData, FileMeta, Table}; use object_store::{memory::InMemory, path::Path, ObjectStore}; use parquet::arrow::arrow_writer::ArrowWriter; use parquet::file::properties::WriterProperties; use url::Url; mod common; -use common::to_arrow; +use common::{read_scan, to_arrow}; const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c000.snappy.parquet"; const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet"; @@ -393,20 +392,7 @@ fn read_with_execute( expected: &[String], ) -> Result<(), Box> { let result_schema: ArrowSchemaRef = Arc::new(scan.schema().as_ref().try_into()?); - let scan_results = scan.execute(engine)?; - let batches: Vec = scan_results - .map(|scan_result| -> DeltaResult<_> { - let scan_result = scan_result?; - let mask = scan_result.full_mask(); - let data = scan_result.raw_data?; - let record_batch = to_arrow(data)?; - if let Some(mask) = mask { - Ok(filter_record_batch(&record_batch, &mask.into())?) - } else { - Ok(record_batch) - } - }) - .try_collect()?; + let batches = read_scan(scan, engine)?; if expected.is_empty() { assert_eq!(batches.len(), 0); diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 212b06cae..0fc2a209f 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1,12 +1,16 @@ +use std::collections::HashMap; use std::sync::Arc; -use arrow::array::StringArray; +use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema as ArrowSchema; use arrow_schema::{DataType as ArrowDataType, Field}; +use itertools::Itertools; +use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::path::Path; use object_store::ObjectStore; +use serde_json::Deserializer; use serde_json::{json, to_vec}; use url::Url; @@ -14,28 +18,37 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::{Error as KernelError, Table}; +use delta_kernel::Error as KernelError; +use delta_kernel::{DeltaResult, Table}; -// setup default engine with in-memory object store. +mod common; +use common::test_read; + +// setup default engine with in-memory (=true) or local fs (=false) object store. fn setup( table_name: &str, + in_memory: bool, ) -> ( Arc, DefaultEngine, Url, ) { - let table_root_path = Path::from(format!("/{table_name}")); - let url = Url::parse(&format!("memory:///{}/", table_root_path)).unwrap(); - let storage = Arc::new(InMemory::new()); - ( - storage.clone(), - DefaultEngine::new( - storage, - table_root_path, - Arc::new(TokioBackgroundExecutor::new()), - ), - url, - ) + let (storage, base_path, base_url): (Arc, &str, &str) = if in_memory { + (Arc::new(InMemory::new()), "/", "memory:///") + } else { + ( + Arc::new(LocalFileSystem::new()), + "./kernel_write_tests/", + "file://", + ) + }; + + let table_root_path = Path::from(format!("{base_path}{table_name}")); + let url = Url::parse(&format!("{base_url}{table_root_path}/")).unwrap(); + let executor = Arc::new(TokioBackgroundExecutor::new()); + let engine = DefaultEngine::new(Arc::clone(&storage), table_root_path, executor); + + (storage, engine, url) } // we provide this table creation function since we only do appends to existing tables for now. @@ -85,21 +98,8 @@ async fn create_table( Ok(Table::new(table_path)) } -#[tokio::test] -async fn test_commit_info() -> Result<(), Box> { - // setup tracing - let _ = tracing_subscriber::fmt::try_init(); - // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); - - // create a simple table: one int column named 'number' - let schema = Arc::new(StructType::new(vec![StructField::new( - "number", - DataType::INTEGER, - true, - )])); - let table = create_table(store.clone(), table_location, schema, &[]).await?; - +// create commit info in arrow of the form {engineInfo: "default engine"} +fn new_commit_info() -> DeltaResult> { // create commit info of the form {engineCommitInfo: Map { "engineInfo": "default engine" } } let commit_info_schema = Arc::new(ArrowSchema::new(vec![Field::new( "engineCommitInfo", @@ -136,11 +136,30 @@ async fn test_commit_info() -> Result<(), Box> { let commit_info_batch = RecordBatch::try_new(commit_info_schema.clone(), vec![Arc::new(array)])?; + Ok(Box::new(ArrowEngineData::new(commit_info_batch))) +} + +#[tokio::test] +async fn test_commit_info() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema, &[]).await?; + + let commit_info = new_commit_info()?; // create a transaction let txn = table .new_transaction(&engine)? - .with_commit_info(Box::new(ArrowEngineData::new(commit_info_batch))); + .with_commit_info(commit_info); // commit! txn.commit(&engine)?; @@ -179,7 +198,7 @@ async fn test_empty_commit() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -202,7 +221,7 @@ async fn test_invalid_commit_info() -> Result<(), Box> { // setup tracing let _ = tracing_subscriber::fmt::try_init(); // setup in-memory object store and default engine - let (store, engine, table_location) = setup("test_table"); + let (store, engine, table_location) = setup("test_table", true); // create a simple table: one int column named 'number' let schema = Arc::new(StructType::new(vec![StructField::new( @@ -251,3 +270,382 @@ async fn test_invalid_commit_info() -> Result<(), Box> { )); Ok(()) } + +// check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() +fn check_action_timestamps<'a>( + parsed_commits: impl Iterator, +) -> Result<(), Box> { + let now: i64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_millis() + .try_into() + .unwrap(); + + parsed_commits.for_each(|commit| { + if let Some(commit_info_ts) = &commit.pointer("/commitInfo/timestamp") { + assert!((now - commit_info_ts.as_i64().unwrap()).abs() < 10_000); + } + if let Some(add_ts) = &commit.pointer("/add/modificationTime") { + assert!((now - add_ts.as_i64().unwrap()).abs() < 10_000); + } + }); + + Ok(()) +} + +// update `value` at (.-separated) `path` to `new_value` +fn set_value( + value: &mut serde_json::Value, + path: &str, + new_value: serde_json::Value, +) -> Result<(), Box> { + let mut path_string = path.replace(".", "/"); + path_string.insert(0, '/'); + let v = value + .pointer_mut(&path_string) + .ok_or_else(|| format!("key '{path}' not found"))?; + *v = new_value; + Ok(()) +} + +#[tokio::test] +async fn test_append() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table(store.clone(), table_location, schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data.into_iter().map(|data| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await + }) + }); + + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + for meta in write_metadata { + txn.add_write_metadata(meta?); + } + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; + + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": {}, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + assert_eq!(parsed_commits, expected_commit); + + test_read( + &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(vec![ + 1, 2, 3, 4, 5, 6, + ]))], + )?), + &table, + engine.as_ref(), + )?; + Ok(()) +} + +#[tokio::test] +async fn test_append_partitioned() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + let partition_col = "partition"; + + // create a simple partitioned table: one int column named 'number', partitioned by string + // column named 'partition' + let table_schema = Arc::new(StructType::new(vec![ + StructField::new("number", DataType::INTEGER, true), + StructField::new("partition", DataType::STRING, true), + ])); + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + let table = create_table( + store.clone(), + table_location, + table_schema.clone(), + &[partition_col], + ) + .await?; + + let commit_info = new_commit_info()?; + + let mut txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::Int32Array::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + let partition_vals = vec!["a", "b"]; + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data + .into_iter() + .zip(partition_vals) + .map(|(data, partition_val)| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::from([(partition_col.to_string(), partition_val.to_string())]), + true, + ) + .await + }) + }); + + let write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + for meta in write_metadata { + txn.add_write_metadata(meta?); + } + + // commit! + txn.commit(engine.as_ref())?; + + let commit1 = store + .get(&Path::from( + "/test_table/_delta_log/00000000000000000001.json", + )) + .await?; + + let mut parsed_commits: Vec<_> = Deserializer::from_slice(&commit1.bytes().await?) + .into_iter::() + .try_collect()?; + + // check that the timestamps in commit_info and add actions are within 10s of SystemTime::now() + // before we clear them for comparison + check_action_timestamps(parsed_commits.iter())?; + + // set timestamps to 0 and paths to known string values for comparison + // (otherwise timestamps are non-deterministic and paths are random UUIDs) + set_value(&mut parsed_commits[0], "commitInfo.timestamp", json!(0))?; + set_value(&mut parsed_commits[1], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[1], "add.path", json!("first.parquet"))?; + set_value(&mut parsed_commits[2], "add.modificationTime", json!(0))?; + set_value(&mut parsed_commits[2], "add.path", json!("second.parquet"))?; + + let expected_commit = vec![ + json!({ + "commitInfo": { + "timestamp": 0, + "operation": "UNKNOWN", + "kernelVersion": format!("v{}", env!("CARGO_PKG_VERSION")), + "operationParameters": {}, + "engineCommitInfo": { + "engineInfo": "default engine" + } + } + }), + json!({ + "add": { + "path": "first.parquet", + "partitionValues": { + "partition": "a" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + json!({ + "add": { + "path": "second.parquet", + "partitionValues": { + "partition": "b" + }, + "size": 483, + "modificationTime": 0, + "dataChange": true + } + }), + ]; + + assert_eq!(parsed_commits, expected_commit); + + test_read( + &ArrowEngineData::new(RecordBatch::try_new( + Arc::new(table_schema.as_ref().try_into()?), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])), + Arc::new(StringArray::from(vec!["a", "a", "a", "b", "b", "b"])), + ], + )?), + &table, + engine.as_ref(), + )?; + Ok(()) +} + +#[tokio::test] +async fn test_append_invalid_schema() -> Result<(), Box> { + // setup tracing + let _ = tracing_subscriber::fmt::try_init(); + // setup in-memory object store and default engine + let (store, engine, table_location) = setup("test_table", true); + + // create a simple table: one int column named 'number' + let table_schema = Arc::new(StructType::new(vec![StructField::new( + "number", + DataType::INTEGER, + true, + )])); + // incompatible data schema: one string column named 'string' + let data_schema = Arc::new(StructType::new(vec![StructField::new( + "string", + DataType::STRING, + true, + )])); + let table = create_table(store.clone(), table_location, table_schema.clone(), &[]).await?; + + let commit_info = new_commit_info()?; + + let txn = table + .new_transaction(&engine)? + .with_commit_info(commit_info); + + // create two new arrow record batches to append + let append_data = [["a", "b"], ["c", "d"]].map(|data| -> DeltaResult<_> { + let data = RecordBatch::try_new( + Arc::new(data_schema.as_ref().try_into()?), + vec![Arc::new(arrow::array::StringArray::from(data.to_vec()))], + )?; + Ok(Box::new(ArrowEngineData::new(data))) + }); + + // write data out by spawning async tasks to simulate executors + let engine = Arc::new(engine); + let write_context = Arc::new(txn.get_write_context()); + let tasks = append_data.into_iter().map(|data| { + // arc clones + let engine = engine.clone(); + let write_context = write_context.clone(); + tokio::task::spawn(async move { + engine + .write_parquet( + data.as_ref().unwrap(), + write_context.as_ref(), + HashMap::new(), + true, + ) + .await + }) + }); + + let mut write_metadata = futures::future::join_all(tasks).await.into_iter().flatten(); + assert!(write_metadata.all(|res| match res { + Err(KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_))) => true, + Err(KernelError::Backtraced { source, .. }) + if matches!( + &*source, + KernelError::Arrow(arrow_schema::ArrowError::SchemaError(_)) + ) => + true, + _ => false, + })); + Ok(()) +}