Skip to content

Commit 64fc441

Browse files
committed
Add Puffin reader and writer
1 parent 697a200 commit 64fc441

File tree

10 files changed

+1339
-0
lines changed

10 files changed

+1339
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"crates/iceberg",
2424
"crates/integrations/*",
2525
"crates/test_utils",
26+
"crates/puffin",
2627
]
2728
exclude = ["bindings/python"]
2829

@@ -97,3 +98,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9798
volo-thrift = "0.10"
9899
hive_metastore = "0.1"
99100
tera = "1"
101+
zstd = "0.13.2"

crates/puffin/Cargo.toml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
iceberg = { workspace = true }
33+
serde = { workspace = true }
34+
serde_derive = { workspace = true }
35+
serde_json = { workspace = true }
36+
once_cell = { workspace = true }
37+
zstd = { workspace = true }
38+
39+
[dev-dependencies]
40+
tempfile = { workspace = true }
41+
tokio = { workspace = true }

crates/puffin/src/blob.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
use std::fs::File;
20+
use std::io::{Read, Seek, SeekFrom};
21+
22+
use iceberg::Result;
23+
24+
use crate::compression::CompressionCodec;
25+
use crate::metadata::BlobMetadata;
26+
27+
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
28+
29+
#[derive(Debug)]
30+
pub struct Blob {
31+
pub(crate) r#type: String,
32+
pub(crate) input_fields: Vec<i32>,
33+
pub(crate) snapshot_id: i64,
34+
pub(crate) sequence_number: i64,
35+
pub(crate) data: Vec<u8>,
36+
pub(crate) requested_compression_codec: Option<CompressionCodec>,
37+
pub(crate) properties: HashMap<String, String>,
38+
}
39+
40+
impl Blob {
41+
/// Reads blob from a Puffin file
42+
pub fn read(file: &mut File, blob_metadata: &BlobMetadata) -> Result<Blob> {
43+
let mut bytes = vec![0; blob_metadata.length];
44+
file.seek(SeekFrom::Start(blob_metadata.offset))?;
45+
file.read(&mut bytes)?;
46+
let data = CompressionCodec::decompress(blob_metadata.compression_codec, bytes)?;
47+
return Ok(Blob {
48+
r#type: blob_metadata.r#type.clone(),
49+
input_fields: blob_metadata.input_fields.clone(),
50+
snapshot_id: blob_metadata.snapshot_id,
51+
sequence_number: blob_metadata.sequence_number,
52+
data,
53+
requested_compression_codec: Some(blob_metadata.compression_codec),
54+
properties: blob_metadata.properties.clone(),
55+
});
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
mod tests {
61+
use std::fs::File;
62+
63+
use crate::blob::Blob;
64+
use crate::metadata::FileMetadata;
65+
66+
fn test_data_path() -> String {
67+
env!("CARGO_MANIFEST_DIR").to_owned() + "/testdata/"
68+
}
69+
70+
const EXPECTED_BLOB_0: &str = "abcdefghi";
71+
const EXPECTED_BLOB_1: &str =
72+
"some blob \u{0000} binary data 🤯 that is not very very very very very very long, is it?";
73+
74+
fn read_blob_as_utf8_string(file_path: &String, idx: usize) -> String {
75+
let mut file = File::open(file_path).unwrap();
76+
let file_metadata = FileMetadata::from_file(&mut file).unwrap();
77+
let blob_metadata = file_metadata.blobs.get(idx).unwrap();
78+
let blob = Blob::read(&mut file, blob_metadata).unwrap();
79+
return String::from_utf8(blob.data).unwrap();
80+
}
81+
82+
#[tokio::test]
83+
async fn test_read_java_generated_uncompressed_metric_data() {
84+
let file_path = test_data_path() + "v1/java-generated/sample-metric-data-uncompressed.bin";
85+
assert_eq!(read_blob_as_utf8_string(&file_path, 0), EXPECTED_BLOB_0);
86+
assert_eq!(read_blob_as_utf8_string(&file_path, 1), EXPECTED_BLOB_1);
87+
}
88+
89+
#[tokio::test]
90+
async fn test_read_java_generated_ztd_compressed_metric_data() {
91+
let file_path =
92+
test_data_path() + "v1/java-generated/sample-metric-data-compressed-zstd.bin";
93+
assert_eq!(read_blob_as_utf8_string(&file_path, 0), EXPECTED_BLOB_0);
94+
assert_eq!(read_blob_as_utf8_string(&file_path, 1), EXPECTED_BLOB_1);
95+
}
96+
}

crates/puffin/src/compression.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
use serde_derive::{Deserialize, Serialize};
20+
21+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
22+
#[serde(rename_all = "lowercase")]
23+
pub enum CompressionCodec {
24+
/** No compression */
25+
NONE,
26+
/** LZ4 single compression frame with content size present */
27+
LZ4,
28+
/** Zstandard single compression frame with content size present */
29+
ZSTD,
30+
}
31+
32+
impl Default for CompressionCodec {
33+
fn default() -> Self {
34+
CompressionCodec::NONE
35+
}
36+
}
37+
38+
impl CompressionCodec {
39+
pub(crate) fn decompress(
40+
compression_codec: CompressionCodec,
41+
bytes: Vec<u8>,
42+
) -> Result<Vec<u8>> {
43+
match compression_codec {
44+
CompressionCodec::NONE => Ok(bytes),
45+
CompressionCodec::LZ4 => Err(Error::new(
46+
ErrorKind::FeatureUnsupported,
47+
"LZ4 decompression is not supported currently",
48+
)),
49+
CompressionCodec::ZSTD => {
50+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
51+
return Ok(decompressed);
52+
}
53+
}
54+
}
55+
56+
pub(crate) fn compress(compression_codec: CompressionCodec, bytes: Vec<u8>) -> Result<Vec<u8>> {
57+
match compression_codec {
58+
CompressionCodec::NONE => Ok(bytes),
59+
CompressionCodec::LZ4 => Err(Error::new(
60+
ErrorKind::FeatureUnsupported,
61+
"LZ4 compression is not supported currently",
62+
)),
63+
CompressionCodec::ZSTD => {
64+
let writer = Vec::<u8>::new();
65+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
66+
encoder.include_checksum(true)?;
67+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
68+
std::io::copy(&mut &bytes[..], &mut encoder)?;
69+
let compressed = encoder.finish()?;
70+
return Ok(compressed);
71+
}
72+
}
73+
}
74+
75+
pub(crate) fn is_none(&self) -> bool {
76+
return match self {
77+
CompressionCodec::NONE => true,
78+
_ => false,
79+
};
80+
}
81+
}

crates/puffin/src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Puffin implementation.
19+
20+
#![deny(missing_docs)]
21+
22+
mod blob;
23+
mod compression;
24+
mod metadata;
25+
mod writer;

0 commit comments

Comments
 (0)