Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 84 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use typed_builder::TypedBuilder;
use uuid::Uuid;

use super::snapshot::SnapshotReference;
Expand All @@ -49,7 +50,10 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)]
#[builder(builder_method(name=declarative_builder, doc="Build a new [`TableMetadata`] in a declarative way. For imperative operations (e.g. `add_snapshot`) and creating new TableMetadata, use [`TableMetadataBuilder`] instead."))]
#[builder(build_method(name=build_unchecked, doc="Build a new [`TableMetadata`] without validation. Consider running `try_normalize` after building to ensure validity."))]
#[builder(builder_type(name=TableMetadataConstructor))]
#[serde(try_from = "TableMetadataEnum")]
/// Fields for the version 2 of the table metadata.
///
Expand All @@ -69,12 +73,15 @@ pub struct TableMetadata {
/// An integer; the highest assigned column ID for the table.
pub(crate) last_column_id: i32,
/// A list of schemas, stored as objects with schema-id.
#[builder(setter(transform = |schemas: Vec<SchemaRef>| schemas.into_iter().map(|s| (s.schema_id(), s)).collect()))]
pub(crate) schemas: HashMap<i32, SchemaRef>,
/// ID of the table’s current schema.
pub(crate) current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
#[builder(setter(transform = |specs: Vec<PartitionSpecRef>| specs.into_iter().map(|s| (s.spec_id(), s)).collect()))]
pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
#[builder(setter(into))]
pub(crate) default_spec: PartitionSpecRef,
/// Partition type of the default partition spec.
pub(crate) default_partition_type: StructType,
Expand All @@ -83,21 +90,25 @@ pub struct TableMetadata {
///A string to string map of table properties. This is used to control settings that
/// affect reading and writing and is not intended to be used for arbitrary metadata.
/// For example, commit.retry.num-retries is used to control the number of commit retries.
#[builder(default)]
pub(crate) properties: HashMap<String, String>,
/// long ID of the current table snapshot; must be the same as the current
/// ID of the main branch in refs.
#[builder(default)]
pub(crate) current_snapshot_id: Option<i64>,
///A list of valid snapshots. Valid snapshots are snapshots for which all
/// data files exist in the file system. A data file must not be deleted
/// from the file system until the last snapshot in which it was listed is
/// garbage collected.
#[builder(default, setter(transform = |snapshots: Vec<SnapshotRef>| snapshots.into_iter().map(|s| (s.snapshot_id(), s)).collect()))]
pub(crate) snapshots: HashMap<i64, SnapshotRef>,
/// A list (optional) of timestamp and snapshot ID pairs that encodes changes
/// to the current snapshot for the table. Each time the current-snapshot-id
/// is changed, a new entry should be added with the last-updated-ms
/// and the new current-snapshot-id. When snapshots are expired from
/// the list of valid snapshots, all entries before a snapshot that has
/// expired should be removed.
#[builder(default)]
pub(crate) snapshot_log: Vec<SnapshotLog>,

/// A list (optional) of timestamp and metadata file location pairs
Expand All @@ -106,9 +117,11 @@ pub struct TableMetadata {
/// previous metadata file location should be added to the list.
/// Tables can be configured to remove the oldest metadata log entries and
/// keep a fixed-size log of the most recent entries after a commit.
#[builder(default)]
pub(crate) metadata_log: Vec<MetadataLog>,

/// A list of sort orders, stored as full sort order objects.
#[builder(setter(transform = |sort_orders: Vec<SortOrderRef>| sort_orders.into_iter().map(|s| (s.order_id, s)).collect()))]
pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
/// Default sort order id of the table. Note that this could be used by
/// writers, but is not used when reading because reads use the specs
Expand All @@ -120,10 +133,17 @@ pub struct TableMetadata {
/// even if the refs map is null.
pub(crate) refs: HashMap<String, SnapshotReference>,
/// Mapping of snapshot ids to statistics files.
#[builder(default, setter(transform = |stats: Vec<StatisticsFile>| {
stats.into_iter().map(|s| (s.snapshot_id, s)).collect()
Comment on lines +136 to +137
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why also default? How is this different from snapshots?
Accepting Vec<StatisticsFile> should be enough.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its not - I missed the default for snapshots and current_snapshot_id and a few others.
My basic reasoning is that all fields that must be set in order to produce valid metadata should not have a default, even if the Rust type would allow it. An example for a non-default field would be schemas, as we require at least one entry in the HashMap to be able to produce valid Metadata (current_schema_id must be set and be present).
Valid TableMetadata must not contain snapshots though, and current_snapshot_id can be None, so those should have defaults.

}))]
pub(crate) statistics: HashMap<i64, StatisticsFile>,
/// Mapping of snapshot ids to partition statistics files.
#[builder(default, setter(transform = |stats: Vec<PartitionStatisticsFile>| {
stats.into_iter().map(|s| (s.snapshot_id, s)).collect()
}))]
pub(crate) partition_statistics: HashMap<i64, PartitionStatisticsFile>,
/// Encryption Keys
#[builder(default)]
pub(crate) encryption_keys: HashMap<String, String>,
}

Expand Down Expand Up @@ -438,7 +458,7 @@ impl TableMetadata {
/// We run this method after json deserialization.
/// All constructors for `TableMetadata` which are part of `iceberg-rust`
/// should return normalized `TableMetadata`.
pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
pub fn try_normalize(&mut self) -> Result<&mut Self> {
self.validate_current_schema()?;
self.normalize_current_snapshot()?;
self.construct_refs();
Expand Down Expand Up @@ -1332,7 +1352,8 @@ mod tests {
use crate::spec::{
BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile,
PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection,
SortField, SortOrder, StatisticsFile, Summary, Transform, Type, UnboundPartitionField,
SortField, SortOrder, StatisticsFile, StructType, Summary, Transform, Type,
UnboundPartitionField,
};

fn check_table_metadata_serde(json: &str, expected_type: TableMetadata) {
Expand Down Expand Up @@ -3020,6 +3041,66 @@ mod tests {
);
}

#[test]
fn test_build_declarative_table_metadata() {
// This test mainly ensures that new fields added to the `TableMetadata` have
// a default value set in the declarative builder, unless they are required, which
// would be a breaking change.
let mut table_metadata = TableMetadata::declarative_builder()
.format_version(FormatVersion::V2)
.table_uuid(Uuid::nil())
.location("s3://db/table".to_string())
.last_sequence_number(1)
.last_updated_ms(1515100955770)
.last_column_id(0)
.schemas(vec![Arc::new(Schema::builder().build().unwrap())])
.current_schema_id(0)
.partition_specs(vec![Arc::new(PartitionSpec::unpartition_spec())])
.sort_orders(vec![Arc::new(SortOrder::unsorted_order())])
.default_spec(PartitionSpec::unpartition_spec())
.default_sort_order_id(0)
.last_partition_id(0)
.default_partition_type(StructType::new(vec![]))
.properties(HashMap::new())
.snapshots(vec![])
.current_snapshot_id(None)
.snapshot_log(vec![])
.metadata_log(vec![])
.refs(HashMap::new())
.build_unchecked();
table_metadata.try_normalize().unwrap();
assert_eq!(table_metadata.format_version, FormatVersion::V2);
assert_eq!(table_metadata.table_uuid, Uuid::nil());
assert_eq!(table_metadata.location, "s3://db/table");
assert_eq!(table_metadata.last_sequence_number, 1);
assert_eq!(table_metadata.last_updated_ms, 1515100955770);
assert_eq!(table_metadata.last_column_id, 0);
assert_eq!(table_metadata.schemas.len(), 1);
assert_eq!(
*table_metadata.schemas.values().next().unwrap(),
Arc::new(Schema::builder().build().unwrap())
);
assert_eq!(table_metadata.current_schema_id, 0);
assert_eq!(table_metadata.partition_specs.len(), 1);
assert_eq!(
table_metadata.partition_specs.get(&0),
Some(&Arc::new(PartitionSpec::unpartition_spec()))
);
assert_eq!(table_metadata.sort_orders.len(), 1);
assert_eq!(table_metadata.default_sort_order_id, 0);
assert_eq!(table_metadata.last_partition_id, 0);
assert_eq!(table_metadata.properties.len(), 0);
assert_eq!(table_metadata.snapshots.len(), 0);
assert_eq!(table_metadata.current_snapshot_id, None);
assert_eq!(table_metadata.snapshot_log.len(), 0);
assert_eq!(table_metadata.metadata_log.len(), 0);
assert_eq!(table_metadata.refs.len(), 0);
assert_eq!(table_metadata.encryption_keys.len(), 0);
assert_eq!(table_metadata.statistics.len(), 0);
assert_eq!(table_metadata.partition_statistics.len(), 0);
assert_eq!(table_metadata.encryption_keys.len(), 0);
}

#[tokio::test]
async fn test_table_metadata_read_write() {
// Create a temporary directory for our test
Expand Down
Loading