Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parse table metadata.configuration as TableProperties #453

Merged
merged 27 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ba8309e
checkpoint with serde but think i need to change that
zachschuermann Nov 1, 2024
9d4b599
rough draft serde for table props
zachschuermann Nov 5, 2024
1b7b193
make everything optional
zachschuermann Nov 5, 2024
02d50ee
errors, comments, cleanup
zachschuermann Nov 8, 2024
e4676d6
fix
zachschuermann Nov 8, 2024
9f8afa4
use new col name list parsing
zachschuermann Nov 8, 2024
ed2c10a
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 13, 2024
42e6028
docs
zachschuermann Nov 18, 2024
f1b9a16
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 18, 2024
82370b4
remove derive
zachschuermann Nov 18, 2024
00b9d8e
make deserializer work on hashmap ref
zachschuermann Nov 18, 2024
f748f87
fix column mapping mode check
zachschuermann Nov 19, 2024
af08092
testing, errors, docs, cleanup
zachschuermann Nov 19, 2024
4587794
cleanup
zachschuermann Nov 19, 2024
1e7d286
fix skipping dat test
zachschuermann Nov 20, 2024
bd9ac7a
address feedback, cleanup
zachschuermann Nov 21, 2024
fa48054
Merge branch 'main' into table-properties
zachschuermann Nov 21, 2024
ff78623
remove unused const
zachschuermann Nov 22, 2024
b667a15
no more serde
zachschuermann Nov 22, 2024
b3cdc61
cleanup
zachschuermann Nov 22, 2024
a891b52
Merge remote-tracking branch 'upstream/main' into table-properties
zachschuermann Nov 22, 2024
d8a2933
add back col mapping mode fn
zachschuermann Nov 22, 2024
d1ce73d
address ryan review
zachschuermann Nov 23, 2024
d8af98c
Merge branch 'main' into table-properties
zachschuermann Nov 25, 2024
6d1b466
use NonZero<u64>
zachschuermann Nov 25, 2024
f18b885
Merge remote-tracking branch 'refs/remotes/origin/table-properties' i…
zachschuermann Nov 25, 2024
437b8db
clippy
zachschuermann Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ pub enum KernelError {
InvalidCommitInfo,
FileAlreadyExists,
MissingCommitInfo,
InvalidTableProperties,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -431,6 +432,7 @@ impl From<Error> for KernelError {
Error::InvalidCommitInfo(_) => KernelError::InvalidCommitInfo,
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::InvalidTableProperties(_) => KernelError::InvalidTableProperties,
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use visitors::{AddVisitor, MetadataVisitor, ProtocolVisitor};

use self::deletion_vector::DeletionVectorDescriptor;
use crate::actions::schemas::GetStructField;
use crate::features::{ReaderFeatures, WriterFeatures};
use crate::schema::{SchemaRef, StructType};
use crate::table_features::{ReaderFeatures, WriterFeatures};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, EngineData};

pub mod deletion_vector;
Expand Down Expand Up @@ -99,7 +100,7 @@ pub struct Metadata {
pub partition_columns: Vec<String>,
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: Option<i64>,
/// Configuration options for the metadata action
/// Configuration options for the metadata action. These are parsed into `TableProperties`.
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
pub configuration: HashMap<String, String>,
}

Expand All @@ -113,6 +114,11 @@ impl Metadata {
pub fn schema(&self) -> DeltaResult<StructType> {
Ok(serde_json::from_str(&self.schema_string)?)
}

/// Parse the metadata configuration HashMap<String, String> into a TableProperties struct.
pub fn parse_table_properties(&self) -> DeltaResult<TableProperties> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should just eagerly parse TableProperties instead doing the HashMap<String,String> to TableProperties separately. Do we split it up because of the Schema derive?

#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
pub struct Metadata {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup exactly. for now this seemed like the best way to separate the schema (Metadata struct) from the actual parsing of TableProperties. perhaps in the future we can look into unifying these? could just omit the derive and impl Schema ourselves (or add some new fancy mechanism that lets us annotate fields with [derive(Schema)]

TableProperties::new(self.configuration.clone())
}
}

#[derive(Default, Debug, Clone, PartialEq, Eq, Schema, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ mod tests {
},
} = write_metadata;
let expected_location = Url::parse("memory:///data/").unwrap();
let expected_size = 497;
let expected_size = 493;

// check that last_modified is within 10s of now
let now: i64 = SystemTime::now()
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ pub enum Error {
/// The file already exists at the path, prohibiting a non-overwrite write
#[error("File already exists: {0}")]
FileAlreadyExists(String),

/// The `metadata` actions's `configuration` field was unable to parse into `TableProperties`
#[error("Invalid table properties: {0}")]
InvalidTableProperties(String),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -222,6 +226,9 @@ impl Error {
pub(crate) fn invalid_log_path(msg: impl ToString) -> Self {
Self::InvalidLogPath(msg.to_string())
}
pub(crate) fn invalid_table_properties(msg: impl ToString) -> Self {
Self::InvalidTableProperties(msg.to_string())
}

pub fn internal_error(msg: impl ToString) -> Self {
Self::InternalError(msg.to_string()).with_backtrace()
Expand Down
4 changes: 3 additions & 1 deletion kernel/src/expressions/column_names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use std::hash::{Hash, Hasher};
use std::iter::Peekable;
use std::ops::Deref;

use serde::Deserialize;

/// A (possibly nested) column name.
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Deserialize)]
pub struct ColumnName {
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
path: Vec<String>,
}
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub mod actions;
pub mod engine_data;
pub mod error;
pub mod expressions;
pub mod features;
pub mod table_features;

#[cfg(feature = "developer-visibility")]
pub mod path;
Expand All @@ -73,6 +73,7 @@ pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table;
pub mod table_properties;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only meaningful change is adding pub mod table_properties, other changes are shifting to colocate module declarations

pub mod transaction;
pub(crate) mod utils;

Expand Down
6 changes: 3 additions & 3 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use url::Url;
use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
use crate::actions::{get_log_add_schema, get_log_schema, ADD_NAME, REMOVE_NAME};
use crate::expressions::{ColumnName, Expression, ExpressionRef, Scalar};
use crate::features::ColumnMappingMode;
use crate::scan::state::{DvInfo, Stats};
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};

use self::log_replay::scan_action_iter;
Expand Down Expand Up @@ -95,7 +95,7 @@ impl ScanBuilder {
let (all_fields, read_fields, have_partition_cols) = get_state_info(
logical_schema.as_ref(),
&self.snapshot.metadata().partition_columns,
self.snapshot.column_mapping_mode,
self.snapshot.table_properties().get_column_mapping_mode(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't normally use get_ for field accessors?

Suggested change
self.snapshot.table_properties().get_column_mapping_mode(),
self.snapshot.table_properties().column_mapping_mode(),

(rust can handle structs with both foo: Foo and fn foo(&self) -> &Foo -- even if both are public)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious to hear everyone's thoughts on this.. I think we need two things here (and the API above was attempting to answer the second part)

  1. a struct to parse the table properties into (TableProperties struct) which lets users (or engines) examine the table properties that are set
  2. an API for checking table properties/feature enablement. This likely needs to check if table features are enabled in the protocol and check table properties being set and perhaps check on other dependent features. (i.e. we need somewhere that we can embed this logic)

this probably isn't the PR for tackling (2) - but would love to hear some thoughts so we could get started on a follow-up. For now I'll just do column_mapping_mode() to unblock this and we can iterate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes a lot of sense. These are checks that are going to be made for every table feature, so it helps to have all of it in one place.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Likely we don't need a method for each one and should rather check a struct of some sort. The API design is a bit subtle though, as some features can be set to multiple modes (i.e. column mapping) and others can just be on/off.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just gonna take it as a follow up #508

and as @nicklan suggested: probably just wait a little longer until we have more use cases and then can inform how we want to build something that unifies protocol/table properties

)?;
let physical_schema = Arc::new(StructType::new(read_fields));
Ok(Scan {
Expand Down Expand Up @@ -247,7 +247,7 @@ impl Scan {
partition_columns: self.snapshot.metadata().partition_columns.clone(),
logical_schema: self.logical_schema.clone(),
read_schema: self.physical_schema.clone(),
column_mapping_mode: self.snapshot.column_mapping_mode,
column_mapping_mode: self.snapshot.table_properties().get_column_mapping_mode(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
visitors::visit_deletion_vector_at,
},
engine_data::{GetData, TypedGetData},
features::ColumnMappingMode,
schema::SchemaRef,
table_features::ColumnMappingMode,
DataVisitor, DeltaResult, Engine, EngineData, Error,
};
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use indexmap::IndexMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use crate::features::ColumnMappingMode;
use crate::table_features::ColumnMappingMode;
use crate::utils::require;
use crate::{DeltaResult, Error};

Expand Down
16 changes: 6 additions & 10 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it
//! has schema etc.)
//!

use std::cmp::Ordering;
use std::sync::{Arc, LazyLock};
Expand All @@ -11,10 +10,10 @@ use tracing::{debug, warn};
use url::Url;

use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME};
use crate::features::{ColumnMappingMode, COLUMN_MAPPING_MODE_KEY};
use crate::path::ParsedLogPath;
use crate::scan::ScanBuilder;
use crate::schema::{Schema, SchemaRef};
use crate::table_properties::TableProperties;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta, FileSystemClient, Version};
use crate::{EngineData, Expression, ExpressionRef};
Expand Down Expand Up @@ -133,7 +132,7 @@ pub struct Snapshot {
metadata: Metadata,
protocol: Protocol,
schema: Schema,
pub(crate) column_mapping_mode: ColumnMappingMode,
table_properties: TableProperties,
}

impl Drop for Snapshot {
Expand Down Expand Up @@ -229,18 +228,15 @@ impl Snapshot {
.read_metadata(engine)?
.ok_or(Error::MissingMetadata)?;
let schema = metadata.schema()?;
let column_mapping_mode = match metadata.configuration.get(COLUMN_MAPPING_MODE_KEY) {
Some(mode) if protocol.min_reader_version >= 2 => mode.as_str().try_into(),
_ => Ok(ColumnMappingMode::None),
}?;
let table_properties = metadata.parse_table_properties()?;
Ok(Self {
table_root: location,
log_segment,
version,
metadata,
protocol,
schema,
column_mapping_mode,
table_properties,
})
}

Expand Down Expand Up @@ -277,8 +273,8 @@ impl Snapshot {
/// Get the [column mapping
zachschuermann marked this conversation as resolved.
Show resolved Hide resolved
/// mode](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping) at this
/// `Snapshot`s version.
pub fn column_mapping_mode(&self) -> ColumnMappingMode {
self.column_mapping_mode
pub fn table_properties(&self) -> &TableProperties {
&self.table_properties
}

/// Create a [`ScanBuilder`] for an `Arc<Snapshot>`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use crate::{DeltaResult, Error};

/// Modes of column mapping a table can be in
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: Do these classes still need serde support?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately yea, since it is still a field in GlobalScanState (which derives Serialize/Deserialize)

pub enum ColumnMappingMode {
/// No column mapping is applied
Expand All @@ -17,9 +17,6 @@ pub enum ColumnMappingMode {
Name,
}

// key to look in metadata.configuration for to get column mapping mode
pub(crate) const COLUMN_MAPPING_MODE_KEY: &str = "delta.columnMapping.mode";

impl TryFrom<&str> for ColumnMappingMode {
type Error = Error;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};

pub use column_mapping::ColumnMappingMode;
pub(crate) use column_mapping::COLUMN_MAPPING_MODE_KEY;
use strum::{AsRefStr, Display as StrumDisplay, EnumString, VariantNames};

mod column_mapping;
Expand Down
Loading
Loading