Skip to content

Commit 62a2e6d

Browse files
committed
feat(puffin): Add PuffinReader
1 parent ae04c8a commit 62a2e6d

File tree

4 files changed

+192
-0
lines changed

4 files changed

+192
-0
lines changed

crates/iceberg/src/puffin/blob.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
21+
pub(crate) const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
22+
23+
/// The blob
24+
#[derive(Debug, PartialEq, Clone)]
25+
pub(crate) struct Blob {
26+
/// See blob types: https://iceberg.apache.org/puffin-spec/#blob-types
27+
pub(crate) r#type: String,
28+
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
29+
pub(crate) fields: Vec<i32>,
30+
/// ID of the Iceberg table's snapshot the blob was computed from
31+
pub(crate) snapshot_id: i64,
32+
/// Sequence number of the Iceberg table's snapshot the blob was computed from
33+
pub(crate) sequence_number: i64,
34+
/// The actual blob data
35+
pub(crate) data: Vec<u8>,
36+
/// Arbitrary meta-information about the blob
37+
pub(crate) properties: HashMap<String, String>,
38+
}

crates/iceberg/src/puffin/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
// Temporarily allowing this while crate is under active development
2222
#![allow(dead_code)]
2323

24+
mod blob;
2425
mod compression;
2526
mod metadata;
27+
mod reader;
2628

2729
#[cfg(test)]
2830
mod test_utils;
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::io::{FileRead, InputFile};
19+
use crate::puffin::blob::Blob;
20+
use crate::puffin::metadata::{BlobMetadata, FileMetadata};
21+
use crate::Result;
22+
23+
/// Puffin reader
24+
pub(crate) struct PuffinReader {
25+
input_file: InputFile,
26+
file_metadata: Option<FileMetadata>,
27+
}
28+
29+
impl PuffinReader {
30+
/// Returns a new Puffin reader
31+
pub(crate) fn new(input_file: InputFile) -> Self {
32+
Self {
33+
input_file,
34+
file_metadata: None,
35+
}
36+
}
37+
38+
/// Returns file metadata
39+
pub(crate) async fn file_metadata(&mut self) -> Result<&FileMetadata> {
40+
if let Some(ref file_metadata) = self.file_metadata {
41+
Ok(file_metadata)
42+
} else {
43+
let file_metadata = FileMetadata::read(&self.input_file).await?;
44+
Ok(self.file_metadata.insert(file_metadata))
45+
}
46+
}
47+
48+
/// Returns blob
49+
pub(crate) async fn blob(&self, blob_metadata: BlobMetadata) -> Result<Blob> {
50+
let file_read = self.input_file.reader().await?;
51+
let start = blob_metadata.offset;
52+
let end = start + blob_metadata.length;
53+
let bytes = file_read.read(start..end).await?.to_vec();
54+
let data = blob_metadata.compression_codec.decompress(bytes)?;
55+
56+
Ok(Blob {
57+
r#type: blob_metadata.r#type,
58+
fields: blob_metadata.fields,
59+
snapshot_id: blob_metadata.snapshot_id,
60+
sequence_number: blob_metadata.sequence_number,
61+
data,
62+
properties: blob_metadata.properties,
63+
})
64+
}
65+
}
66+
67+
#[cfg(test)]
68+
mod tests {
69+
70+
use crate::puffin::reader::PuffinReader;
71+
use crate::puffin::test_utils::{
72+
blob_0, blob_1, java_uncompressed_metric_input_file,
73+
java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
74+
zstd_compressed_metric_file_metadata,
75+
};
76+
77+
#[tokio::test]
78+
async fn test_puffin_reader_uncompressed_metric_data() {
79+
let input_file = java_uncompressed_metric_input_file();
80+
let mut puffin_reader = PuffinReader::new(input_file);
81+
82+
let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
83+
assert_eq!(file_metadata, uncompressed_metric_file_metadata());
84+
85+
assert_eq!(
86+
puffin_reader
87+
.blob(file_metadata.blobs.first().unwrap().clone())
88+
.await
89+
.unwrap(),
90+
blob_0()
91+
);
92+
93+
assert_eq!(
94+
puffin_reader
95+
.blob(file_metadata.blobs.get(1).unwrap().clone())
96+
.await
97+
.unwrap(),
98+
blob_1(),
99+
)
100+
}
101+
102+
#[tokio::test]
103+
async fn test_puffin_reader_zstd_compressed_metric_data() {
104+
let input_file = java_zstd_compressed_metric_input_file();
105+
let mut puffin_reader = PuffinReader::new(input_file);
106+
107+
let file_metadata = puffin_reader.file_metadata().await.unwrap().clone();
108+
assert_eq!(file_metadata, zstd_compressed_metric_file_metadata());
109+
110+
assert_eq!(
111+
puffin_reader
112+
.blob(file_metadata.blobs.first().unwrap().clone())
113+
.await
114+
.unwrap(),
115+
blob_0()
116+
);
117+
118+
assert_eq!(
119+
puffin_reader
120+
.blob(file_metadata.blobs.get(1).unwrap().clone())
121+
.await
122+
.unwrap(),
123+
blob_1(),
124+
)
125+
}
126+
}

crates/iceberg/src/puffin/test_utils.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919

20+
use super::blob::Blob;
2021
use crate::io::{FileIOBuilder, InputFile};
2122
use crate::puffin::compression::CompressionCodec;
2223
use crate::puffin::metadata::{BlobMetadata, FileMetadata, CREATED_BY_PROPERTY};
@@ -68,6 +69,7 @@ pub(crate) const METRIC_BLOB_0_TYPE: &str = "some-blob";
6869
pub(crate) const METRIC_BLOB_0_INPUT_FIELDS: [i32; 1] = [1];
6970
pub(crate) const METRIC_BLOB_0_SNAPSHOT_ID: i64 = 2;
7071
pub(crate) const METRIC_BLOB_0_SEQUENCE_NUMBER: i64 = 1;
72+
pub(crate) const METRIC_BLOB_0_DATA: &str = "abcdefghi";
7173

7274
pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata {
7375
BlobMetadata {
@@ -95,10 +97,23 @@ pub(crate) fn uncompressed_metric_blob_0_metadata() -> BlobMetadata {
9597
}
9698
}
9799

100+
pub(crate) fn blob_0() -> Blob {
101+
Blob {
102+
r#type: METRIC_BLOB_0_TYPE.to_string(),
103+
fields: METRIC_BLOB_0_INPUT_FIELDS.to_vec(),
104+
snapshot_id: METRIC_BLOB_0_SNAPSHOT_ID,
105+
sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER,
106+
data: METRIC_BLOB_0_DATA.as_bytes().to_vec(),
107+
properties: HashMap::new(),
108+
}
109+
}
110+
98111
pub(crate) const METRIC_BLOB_1_TYPE: &str = "some-other-blob";
99112
pub(crate) const METRIC_BLOB_1_INPUT_FIELDS: [i32; 1] = [2];
100113
pub(crate) const METRIC_BLOB_1_SNAPSHOT_ID: i64 = 2;
101114
pub(crate) const METRIC_BLOB_1_SEQUENCE_NUMBER: i64 = 1;
115+
pub(crate) const METRIC_BLOB_1_DATA: &str =
116+
"some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?";
102117

103118
pub(crate) fn uncompressed_metric_blob_1_metadata() -> BlobMetadata {
104119
BlobMetadata {
@@ -126,6 +141,17 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata {
126141
}
127142
}
128143

144+
pub(crate) fn blob_1() -> Blob {
145+
Blob {
146+
r#type: METRIC_BLOB_1_TYPE.to_string(),
147+
fields: METRIC_BLOB_1_INPUT_FIELDS.to_vec(),
148+
snapshot_id: METRIC_BLOB_1_SNAPSHOT_ID,
149+
sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER,
150+
data: METRIC_BLOB_1_DATA.as_bytes().to_vec(),
151+
properties: HashMap::new(),
152+
}
153+
}
154+
129155
pub(crate) const CREATED_BY_PROPERTY_VALUE: &str = "Test 1234";
130156

131157
pub(crate) fn file_properties() -> HashMap<String, String> {

0 commit comments

Comments
 (0)