From 5779f06c569cdd139c93405a4745b01da1a76451 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 7 Oct 2024 16:20:27 -0700 Subject: [PATCH] cleanup --- kernel/src/transaction.rs | 157 ++++++++++++++++++++------------------ 1 file changed, 83 insertions(+), 74 deletions(-) diff --git a/kernel/src/transaction.rs b/kernel/src/transaction.rs index 8e184a69a..6d58e0d31 100644 --- a/kernel/src/transaction.rs +++ b/kernel/src/transaction.rs @@ -7,14 +7,20 @@ use crate::snapshot::Snapshot; use crate::{DataType, Expression}; use crate::{DeltaResult, Engine, EngineData}; +const KERNEL_VERSION: &str = env!("CARGO_PKG_VERSION"); + +/// A transaction represents an in-progress write to a table. pub struct Transaction { read_snapshot: Arc, commit_info: Option, } +// Since the engine can include any commit info it likes, we unify the data/schema pair as a single +// struct with Arc semantics. +#[derive(Clone)] struct EngineCommitInfo { - data: Box, - schema: Schema, + data: Arc, + schema: SchemaRef, } impl std::fmt::Debug for Transaction { @@ -28,6 +34,12 @@ impl std::fmt::Debug for Transaction { } impl Transaction { + /// Create a new transaction from a snapshot. The snapshot will be used to read the current + /// state of the table (e.g. to read the current version). + /// + /// Instead of using this API, the more typical API is + /// [Table::new_transaction](crate::table::Table::new_transaction) to create a transaction from + /// a table automatically backed by the latest snapshot. pub fn new(snapshot: impl Into>) -> Self { Transaction { read_snapshot: snapshot.into(), @@ -35,14 +47,13 @@ impl Transaction { } } + /// Consume the transaction and commit the in-progress write to the table. pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // step one: construct the iterator of actions we want to commit - // - // TODO: enforce single row commit info - // TODO: for now we always require commit info - let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info)?; + // note: only support commit_info right now. + let (actions, _actions_schema) = generate_commit_info(engine, self.commit_info.clone())?; - // step two: figure out the commit version and path to write + // step two: set new commit version (current_version + 1) and path to write let commit_version = &self.read_snapshot.version() + 1; let commit_file_name = format!("{:020}", commit_version) + ".json"; let commit_path = &self @@ -53,9 +64,14 @@ impl Transaction { // step three: commit the actions as a json file in the log let json_handler = engine.get_json_handler(); - - json_handler.write_json_file(commit_path, Box::new(actions), false)?; - Ok(CommitResult::Committed(commit_version)) + match json_handler.write_json_file(commit_path, Box::new(actions), false) { + Ok(()) => Ok(CommitResult::Committed(commit_version)), + Err(crate::error::Error::ObjectStore(object_store::Error::AlreadyExists { + path: _, + source: _, + })) => Ok(CommitResult::Conflict(self, commit_version)), + Err(e) => Err(e), + } } /// Add commit info to the transaction. This is commit-wide metadata that is written as the @@ -63,16 +79,25 @@ impl Transaction { /// require any commit info, pass an empty `EngineData`. pub fn commit_info(&mut self, commit_info: Box, schema: Schema) { self.commit_info = Some(EngineCommitInfo { - data: commit_info, - schema, + data: commit_info.into(), + schema: schema.into(), }); } } +/// Result after committing a transaction. If 'committed', the version is the new version written +/// to the log. If 'conflict', the transaction is returned so the caller can resolve the conflict +/// (along with the version which conflicted). pub enum CommitResult { + /// The transaction was successfully committed at the version. Committed(crate::Version), + /// The transaction conflicted with an existing version (at the version given). + Conflict(Transaction, crate::Version), } +// given the engine's commit info (data and schema as [EngineCommitInfo]) we want to create both: +// (1) the commitInfo action to commit (and append more actions to) and +// (2) the schema for the actions to commit (this is determined by the engine's commit info schema) fn generate_commit_info<'a>( engine: &'a dyn Engine, engine_commit_info: Option, @@ -84,80 +109,64 @@ fn generate_commit_info<'a>( ADD_NAME, COMMIT_INFO_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, TRANSACTION_NAME, }; - let action_schema = - Arc::new(engine_commit_info - .as_ref() - .map_or(get_log_schema().clone(), |commit_info| { - let mut action_fields = get_log_schema().fields().collect::>(); - let commit_info_field = action_fields.pop().unwrap(); - let mut commit_info_fields = - if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() { - commit_info_schema.fields().collect::>() - } else { - unreachable!() - }; - commit_info_fields.extend(commit_info.schema.fields()); - let commit_info_schema = - StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect()); - let mut action_fields = action_fields - .into_iter() - .map(|f| f.clone()) - .collect::>(); - action_fields.push(crate::schema::StructField::new( - COMMIT_INFO_NAME, - commit_info_schema, - true, - )); - StructType::new(action_fields) - })); + // TODO: enforce single row commit info + // TODO: for now we always require commit info + let action_schema = Arc::new(engine_commit_info.as_ref().map_or( + get_log_schema().clone(), + |commit_info| { + let mut action_fields = get_log_schema().fields().collect::>(); + let commit_info_field = action_fields.pop().unwrap(); + let mut commit_info_fields = + if let DataType::Struct(commit_info_schema) = commit_info_field.data_type() { + commit_info_schema.fields().collect::>() + } else { + unreachable!() + }; + commit_info_fields.extend(commit_info.schema.fields()); + let commit_info_schema = + StructType::new(commit_info_fields.into_iter().map(|f| f.clone()).collect()); + let mut action_fields = action_fields + .into_iter() + .map(|f| f.clone()) + .collect::>(); + action_fields.push(crate::schema::StructField::new( + COMMIT_INFO_NAME, + commit_info_schema, + true, + )); + StructType::new(action_fields) + }, + )); let action_schema_ref = Arc::clone(&action_schema); let actions = engine_commit_info.into_iter().map(move |commit_info| { - // TODO RENAME let engine_commit_info_data = commit_info.data; let engine_commit_info_schema = commit_info.schema; - // expression to select all the columns - let mut commit_info_expr = vec![Expression::literal("v0.3.1")]; + let mut commit_info_expr = vec![Expression::literal(format!("v{}", KERNEL_VERSION))]; commit_info_expr.extend( engine_commit_info_schema .fields() .map(|f| Expression::column(f.name())) .collect::>(), ); - let commit_info_expr = Expression::Struct(vec![ - Expression::Literal(Scalar::Null( - action_schema_ref.field(ADD_NAME).unwrap().data_type().clone(), - )), - Expression::Literal(Scalar::Null( - action_schema_ref - .field(REMOVE_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Literal(Scalar::Null( - action_schema_ref - .field(METADATA_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Literal(Scalar::Null( - action_schema_ref - .field(PROTOCOL_NAME) - .unwrap() - .data_type() - .clone(), - )), + // generate expression with null for all the fields except the commit_info field, and + // append the commit_info to the end. + let commit_info_expr_fields = [ + ADD_NAME, + REMOVE_NAME, + METADATA_NAME, + PROTOCOL_NAME, + TRANSACTION_NAME, + ] + .iter() + .map(|name| { Expression::Literal(Scalar::Null( - action_schema_ref - .field(TRANSACTION_NAME) - .unwrap() - .data_type() - .clone(), - )), - Expression::Struct(commit_info_expr), - ]); + action_schema_ref.field(name).unwrap().data_type().clone(), + )) + }) + .chain(std::iter::once(Expression::Struct(commit_info_expr))) + .collect::>(); + let commit_info_expr = Expression::Struct(commit_info_expr_fields); // commit info has arbitrary schema ex: {engineInfo: string, operation: string} // we want to bundle it up and put it in the commit_info field of the actions.