Skip to content

Commit c84b079

Browse files
authored
feat: add classic and uuid parquet checkpoint path generation (#782)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? <!-- Please clarify what changes you are proposing and why the changes are needed. The purpose of this section is to outline the changes, why they are needed, and how this PR fixes the issue. If the reason for the change is already explained clearly in an issue, then it does not need to be restated here. 1. If you propose a new API or feature, clarify the use case for a new API or feature. 2. If you fix a bug, you can clarify why it is a bug. --> This PR introduces the helper methods: - `new_uuid_parquet_checkpoint` which creates a new `ParsedCheckpointPath<Url>` for a uuid-named parquet checkpoint file at the specified version. The UUID-naming scheme looks like: `n.checkpoint.u.parquet`, where u is a UUID and n is the snapshot version that this checkpoint represents. - `new_classic_parquet_checkpoint` which creates a new `ParsedCheckpointPath<Url>` for a classic-named parquet checkpoint file at the specified version. The classic-naming scheme looks like: `n.checkpoint.parquet`, where n is the snapshot version that this checkpoint represents. - **Updates the `uuid` dependency to always include `v4` and `fast-rng` features:** - This ensures that `uuid::new_v4()` is always available. - The `fast-rng` feature improves performance when generating UUIDs. For more information on the two checkpoint naming-schemes: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#uuid-named-checkpoint https://github.com/delta-io/delta/blob/master/PROTOCOL.md#classic-checkpoint This PR is part of the on-going effort to implement single-file checkpoint write support. For reference, [[link to write API proposal]](#779) <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> ## How was this change tested? <!-- Please make sure to add test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested, ideally via a reproducible test documented in the PR description. --> - `test_new_uuid_parquet_checkpoint` - verifies UUID-named Parquet checkpoint creation with proper attributes. - `test_new_classic_parquet_checkpoint` - verifies classic-named Parquet checkpoint creation with proper attributes.
1 parent 4ad2bc6 commit c84b079

File tree

2 files changed

+85
-11
lines changed

2 files changed

+85
-11
lines changed

kernel/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ thiserror = "1"
4949
# only for structured logging
5050
tracing = { version = "0.1", features = ["log"] }
5151
url = "2"
52-
uuid = "1.10.0"
52+
uuid = { version = "1.10.0", features = ["v4", "fast-rng"] }
5353
z85 = "3.0.5"
5454

5555
# bring in our derive macros
@@ -118,8 +118,6 @@ default-engine-base = [
118118
"need_arrow",
119119
"object_store",
120120
"tokio",
121-
"uuid/v4",
122-
"uuid/fast-rng",
123121
]
124122

125123
# the default-engine use the reqwest crate with default features which uses native-tls. if you want

kernel/src/path.rs

Lines changed: 84 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::str::FromStr;
44
use url::Url;
5+
use uuid::Uuid;
56

67
use crate::{DeltaResult, Error, FileMeta, Version};
78

@@ -180,22 +181,59 @@ impl<Location: AsUrl> ParsedLogPath<Location> {
180181
}
181182

182183
impl ParsedLogPath<Url> {
183-
/// Create a new ParsedCommitPath<Url> for a new json commit file at the specified version
184-
pub(crate) fn new_commit(
185-
table_root: &Url,
186-
version: Version,
187-
) -> DeltaResult<ParsedLogPath<Url>> {
184+
const DELTA_LOG_DIR: &'static str = "_delta_log/";
185+
186+
/// Helper method to create a path with the given filename generator
187+
fn create_path(table_root: &Url, filename: String) -> DeltaResult<Self> {
188+
let location = table_root.join(Self::DELTA_LOG_DIR)?.join(&filename)?;
189+
Self::try_from(location)?.ok_or_else(|| {
190+
Error::internal_error(format!("Attempted to create an invalid path: {}", filename))
191+
})
192+
}
193+
194+
/// Create a new ParsedCommitPath<Url> for a new json commit file
195+
pub(crate) fn new_commit(table_root: &Url, version: Version) -> DeltaResult<Self> {
188196
let filename = format!("{:020}.json", version);
189-
let location = table_root.join("_delta_log/")?.join(&filename)?;
190-
let path = Self::try_from(location)?
191-
.ok_or_else(|| Error::internal_error("attempted to create invalid commit path"))?;
197+
let path = Self::create_path(table_root, filename)?;
192198
if !path.is_commit() {
193199
return Err(Error::internal_error(
194200
"ParsedLogPath::new_commit created a non-commit path",
195201
));
196202
}
197203
Ok(path)
198204
}
205+
206+
/// Create a new ParsedCheckpointPath<Url> for a classic parquet checkpoint file
207+
#[allow(dead_code)] // TODO: Remove this once we have a use case for it
208+
pub(crate) fn new_classic_parquet_checkpoint(
209+
table_root: &Url,
210+
version: Version,
211+
) -> DeltaResult<Self> {
212+
let filename = format!("{:020}.checkpoint.parquet", version);
213+
let path = Self::create_path(table_root, filename)?;
214+
if !path.is_checkpoint() {
215+
return Err(Error::internal_error(
216+
"ParsedLogPath::new_classic_parquet_checkpoint created a non-checkpoint path",
217+
));
218+
}
219+
Ok(path)
220+
}
221+
222+
/// Create a new ParsedCheckpointPath<Url> for a UUID-based parquet checkpoint file
223+
#[allow(dead_code)] // TODO: Remove this once we have a use case for it
224+
pub(crate) fn new_uuid_parquet_checkpoint(
225+
table_root: &Url,
226+
version: Version,
227+
) -> DeltaResult<Self> {
228+
let filename = format!("{:020}.checkpoint.{}.parquet", version, Uuid::new_v4());
229+
let path = Self::create_path(table_root, filename)?;
230+
if !path.is_checkpoint() {
231+
return Err(Error::internal_error(
232+
"ParsedLogPath::new_uuid_parquet_checkpoint created a non-checkpoint path",
233+
));
234+
}
235+
Ok(path)
236+
}
199237
}
200238

201239
#[cfg(test)]
@@ -566,4 +604,42 @@ mod tests {
566604
assert!(matches!(log_path.file_type, LogPathFileType::Commit));
567605
assert_eq!(log_path.filename, "00000000000000000010.json");
568606
}
607+
608+
#[test]
609+
fn test_new_uuid_parquet_checkpoint() {
610+
let table_log_dir = table_log_dir_url();
611+
let log_path = ParsedLogPath::new_uuid_parquet_checkpoint(&table_log_dir, 10).unwrap();
612+
613+
assert_eq!(log_path.version, 10);
614+
assert!(log_path.is_checkpoint());
615+
assert_eq!(log_path.extension, "parquet");
616+
if let LogPathFileType::UuidCheckpoint(uuid) = &log_path.file_type {
617+
assert_eq!(uuid.len(), UUID_PART_LEN);
618+
} else {
619+
panic!("Expected UuidCheckpoint file type");
620+
}
621+
622+
let filename = log_path.filename.to_string();
623+
let filename_parts: Vec<&str> = filename.split('.').collect();
624+
assert_eq!(filename_parts.len(), 4);
625+
assert_eq!(filename_parts[0], "00000000000000000010");
626+
assert_eq!(filename_parts[1], "checkpoint");
627+
assert_eq!(filename_parts[2].len(), UUID_PART_LEN);
628+
assert_eq!(filename_parts[3], "parquet");
629+
}
630+
631+
#[test]
632+
fn test_new_classic_parquet_checkpoint() {
633+
let table_log_dir = table_log_dir_url();
634+
let log_path = ParsedLogPath::new_classic_parquet_checkpoint(&table_log_dir, 10).unwrap();
635+
636+
assert_eq!(log_path.version, 10);
637+
assert!(log_path.is_checkpoint());
638+
assert_eq!(log_path.extension, "parquet");
639+
assert!(matches!(
640+
log_path.file_type,
641+
LogPathFileType::SinglePartCheckpoint
642+
));
643+
assert_eq!(log_path.filename, "00000000000000000010.checkpoint.parquet");
644+
}
569645
}

0 commit comments

Comments
 (0)