Skip to content

Commit

Permalink
[write] Transaction append data API (#393)
Browse files Browse the repository at this point in the history
This PR is the second (of two) major pieces for supporting simple blind
appends. It implements:
1. **new `Transaction` APIs** for appending data to delta tables:
a. `get_write_context()` to get a `WriteContext` to pass to the data
path which includes all information needed to write: `target directory`,
`snapshot schema`, `transformation expression`, and (future: columns to
collect stats on)
b. `add_write_metadata(impl EngineData)` to add metadata about a write
to the transaction along with a new static method
`transaction::get_write_metadata_schema` to provide the expected schema
of this engine data.
c. new machinery in 'commit' method to commit new `Add` actions for each
row of write_metadata from the API above.
2. **new default engine capabilities** for using the default engine to
write parquet data (to append to tables):
  a. parquet handler can now `write_parquet_file(EngineData)`
  b. usage example in `write.rs` tests for now
3. **new append tests** in the `write.rs` integration test suite

Details and some follow-ups:
- the parquet writing (similar to JSON) currently just buffers
everything into memory before issuing one big PUT. we should make this
smarter: single PUT for small data and MultipartUpload for larger data.
tracking in #418
- schema enforcement is done at the data layer. this means it is up to
the engine to call the expression evaluation and we expect this to fail
if the output schema is incorrect (see `test_append_invalid_schema` in
`write.rs` integration test). we may want to change this in the future
to eagerly error based on the engine providing a schema up front at
metadata time (transaction creation time)

based on #370
resolves #390
  • Loading branch information
zachschuermann authored Nov 8, 2024
1 parent 4466509 commit d8329ae
Show file tree
Hide file tree
Showing 11 changed files with 955 additions and 72 deletions.
2 changes: 2 additions & 0 deletions kernel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ default-engine = [
"parquet/object_store",
"reqwest",
"tokio",
"uuid/v4",
"uuid/fast-rng",
]

developer-visibility = []
Expand Down
6 changes: 3 additions & 3 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,11 @@ fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaR
/// serialize an arrow RecordBatch to a JSON string by appending to a buffer.
// TODO (zach): this should stream data to the JSON writer and output an iterator.
pub(crate) fn to_json_bytes(
data: impl Iterator<Item = Box<dyn EngineData>> + Send,
data: impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send,
) -> DeltaResult<Vec<u8>> {
let mut writer = LineDelimitedWriter::new(Vec::new());
for chunk in data.into_iter() {
let arrow_data = ArrowEngineData::try_from_engine_data(chunk)?;
let arrow_data = ArrowEngineData::try_from_engine_data(chunk?)?;
let record_batch = arrow_data.record_batch();
writer.write(record_batch)?;
}
Expand Down Expand Up @@ -1436,7 +1436,7 @@ mod tests {
vec![Arc::new(StringArray::from(vec!["string1", "string2"]))],
)?;
let data: Box<dyn EngineData> = Box::new(ArrowEngineData::new(data));
let json = to_json_bytes(Box::new(std::iter::once(data)))?;
let json = to_json_bytes(Box::new(std::iter::once(Ok(data))))?;
assert_eq!(
json,
"{\"string\":\"string1\"}\n{\"string\":\"string2\"}\n".as_bytes()
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<E: TaskExecutor> JsonHandler for DefaultJsonHandler<E> {
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = Box<dyn EngineData>> + Send>,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + '_>,
_overwrite: bool,
) -> DeltaResult<()> {
let buffer = to_json_bytes(data)?;
Expand Down
33 changes: 32 additions & 1 deletion kernel/src/engine/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! a separate thread pool, provided by the [`TaskExecutor`] trait. Read more in
//! the [executor] module.
use std::collections::HashMap;
use std::sync::Arc;

use self::storage::parse_url_opts;
Expand All @@ -16,9 +17,13 @@ use self::executor::TaskExecutor;
use self::filesystem::ObjectStoreFileSystemClient;
use self::json::DefaultJsonHandler;
use self::parquet::DefaultParquetHandler;
use super::arrow_data::ArrowEngineData;
use super::arrow_expression::ArrowExpressionHandler;
use crate::schema::Schema;
use crate::transaction::WriteContext;
use crate::{
DeltaResult, Engine, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler,
DeltaResult, Engine, EngineData, ExpressionHandler, FileSystemClient, JsonHandler,
ParquetHandler,
};

pub mod executor;
Expand Down Expand Up @@ -108,6 +113,32 @@ impl<E: TaskExecutor> DefaultEngine<E> {
pub fn get_object_store_for_url(&self, _url: &Url) -> Option<Arc<DynObjectStore>> {
Some(self.store.clone())
}

pub async fn write_parquet(
&self,
data: &ArrowEngineData,
write_context: &WriteContext,
partition_values: HashMap<String, String>,
data_change: bool,
) -> DeltaResult<Box<dyn EngineData>> {
let transform = write_context.logical_to_physical();
let input_schema: Schema = data.record_batch().schema().try_into()?;
let output_schema = write_context.schema();
let logical_to_physical_expr = self.get_expression_handler().get_evaluator(
input_schema.into(),
transform.clone(),
output_schema.clone().into(),
);
let physical_data = logical_to_physical_expr.evaluate(data)?;
self.parquet
.write_parquet_file(
write_context.target_dir(),
physical_data,
partition_values,
data_change,
)
.await
}
}

impl<E: TaskExecutor> Engine for DefaultEngine<E> {
Expand Down
Loading

0 comments on commit d8329ae

Please sign in to comment.