Skip to content

Commit 3eb4fc5

Browse files
committed
Add Puffin crate and CompressionCodec
1 parent f3a571d commit 3eb4fc5

File tree

4 files changed

+184
-0
lines changed

4 files changed

+184
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"crates/iceberg",
2424
"crates/integration_tests",
2525
"crates/integrations/*",
26+
"crates/puffin",
2627
"crates/test_utils",
2728
]
2829
exclude = ["bindings/python"]
@@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9899
volo-thrift = "0.10"
99100
hive_metastore = "0.1"
100101
tera = "1"
102+
zstd = "0.13.2"

crates/puffin/Cargo.toml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
iceberg = { workspace = true }
33+
zstd = { workspace = true }
34+
35+
[dev-dependencies]
36+
tokio = { workspace = true }

crates/puffin/src/compression.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
20+
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
21+
#[derive(Default)]
22+
/// Data compression formats
23+
pub enum CompressionCodec {
24+
#[default]
25+
/// No compression
26+
None,
27+
/// LZ4 single compression frame with content size present
28+
Lz4,
29+
/// Zstandard single compression frame with content size present
30+
Zstd,
31+
}
32+
33+
impl CompressionCodec {
34+
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
35+
match self {
36+
CompressionCodec::None => Ok(bytes),
37+
CompressionCodec::Lz4 => Err(Error::new(
38+
ErrorKind::FeatureUnsupported,
39+
"LZ4 decompression is not supported currently",
40+
)),
41+
CompressionCodec::Zstd => {
42+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
43+
Ok(decompressed)
44+
}
45+
}
46+
}
47+
48+
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
49+
match self {
50+
CompressionCodec::None => Ok(bytes),
51+
CompressionCodec::Lz4 => Err(Error::new(
52+
ErrorKind::FeatureUnsupported,
53+
"LZ4 compression is not supported currently",
54+
)),
55+
CompressionCodec::Zstd => {
56+
let writer = Vec::<u8>::new();
57+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
58+
encoder.include_checksum(true)?;
59+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
60+
std::io::copy(&mut &bytes[..], &mut encoder)?;
61+
let compressed = encoder.finish()?;
62+
Ok(compressed)
63+
}
64+
}
65+
}
66+
67+
pub(crate) fn is_none(&self) -> bool {
68+
matches!(self, CompressionCodec::None)
69+
}
70+
}
71+
72+
#[cfg(test)]
73+
mod tests {
74+
use crate::compression::CompressionCodec;
75+
76+
#[tokio::test]
77+
async fn test_compression_codec_none() {
78+
let compression_codec = CompressionCodec::None;
79+
let bytes_vec = [0_u8; 100].to_vec();
80+
81+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
82+
assert_eq!(bytes_vec, compressed);
83+
84+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
85+
assert_eq!(compressed, decompressed)
86+
}
87+
88+
#[tokio::test]
89+
async fn test_compression_codec_lz4() {
90+
let compression_codec = CompressionCodec::Lz4;
91+
let bytes_vec = [0_u8; 100].to_vec();
92+
93+
assert_eq!(
94+
compression_codec
95+
.compress(bytes_vec.clone())
96+
.unwrap_err()
97+
.to_string(),
98+
"FeatureUnsupported => LZ4 compression is not supported currently",
99+
);
100+
101+
assert_eq!(
102+
compression_codec
103+
.decompress(bytes_vec.clone())
104+
.unwrap_err()
105+
.to_string(),
106+
"FeatureUnsupported => LZ4 decompression is not supported currently",
107+
)
108+
}
109+
110+
#[tokio::test]
111+
async fn test_compression_codec_zstd() {
112+
let compression_codec = CompressionCodec::Zstd;
113+
let bytes_vec = [0_u8; 100].to_vec();
114+
115+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
116+
assert!(compressed.len() < bytes_vec.len());
117+
118+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
119+
assert_eq!(decompressed, bytes_vec)
120+
}
121+
}

crates/puffin/src/lib.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Iceberg Puffin implementation.
19+
20+
#![deny(missing_docs)]
21+
// Temporarily allowing this while crate is under active development
22+
#![allow(dead_code)]
23+
24+
mod compression;
25+
pub use compression::CompressionCodec;

0 commit comments

Comments
 (0)