Skip to content

Commit 9696ed7

Browse files
committed
Add Puffin reader and writer
1 parent f3a571d commit 9696ed7

17 files changed

+1735
-8
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ members = [
2323
"crates/iceberg",
2424
"crates/integration_tests",
2525
"crates/integrations/*",
26+
"crates/puffin",
2627
"crates/test_utils",
2728
]
2829
exclude = ["bindings/python"]
@@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9899
volo-thrift = "0.10"
99100
hive_metastore = "0.1"
100101
tera = "1"
102+
zstd = "0.13.2"

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::Result;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
29+
2930
mod track_writer;
31+
pub use track_writer::TrackWriter;
3032

3133
pub mod location_generator;
3234

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! The module contains the file writer for parquet file format.
1919
2020
use std::collections::HashMap;
21-
use std::sync::atomic::AtomicI64;
21+
use std::sync::atomic::AtomicU64;
2222
use std::sync::Arc;
2323

2424
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -81,7 +81,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8181

8282
async fn build(self) -> crate::Result<Self::R> {
8383
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
84-
let written_size = Arc::new(AtomicI64::new(0));
84+
let written_size = Arc::new(AtomicU64::new(0));
8585
let out_file = self.file_io.new_output(
8686
self.location_generator
8787
.generate_location(&self.file_name_generator.generate_file_name()),
@@ -214,7 +214,7 @@ pub struct ParquetWriter {
214214
schema: SchemaRef,
215215
out_file: OutputFile,
216216
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
217-
written_size: Arc<AtomicI64>,
217+
written_size: Arc<AtomicU64>,
218218
current_row_num: usize,
219219
}
220220

crates/iceberg/src/writer/file_writer/track_writer.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::atomic::AtomicI64;
18+
use std::sync::atomic::AtomicU64;
1919
use std::sync::Arc;
2020

2121
use bytes::Bytes;
@@ -24,18 +24,25 @@ use crate::io::FileWrite;
2424
use crate::Result;
2525

2626
/// `TrackWriter` is used to track the written size.
27-
pub(crate) struct TrackWriter {
27+
pub struct TrackWriter {
2828
inner: Box<dyn FileWrite>,
29-
written_size: Arc<AtomicI64>,
29+
/// number of bytes written so far
30+
written_size: Arc<AtomicU64>,
3031
}
3132

3233
impl TrackWriter {
33-
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> Self {
34+
/// Create new writer
35+
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicU64>) -> Self {
3436
Self {
3537
inner: writer,
3638
written_size,
3739
}
3840
}
41+
42+
/// Number of bytes written so far
43+
pub fn bytes_written(&self) -> u64 {
44+
self.written_size.load(std::sync::atomic::Ordering::SeqCst)
45+
}
3946
}
4047

4148
#[async_trait::async_trait]
@@ -44,7 +51,7 @@ impl FileWrite for TrackWriter {
4451
let size = bs.len();
4552
self.inner.write(bs).await.map(|v| {
4653
self.written_size
47-
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
54+
.fetch_add(size as u64, std::sync::atomic::Ordering::Relaxed);
4855
v
4956
})
5057
}

crates/puffin/Cargo.toml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
bytes = { workspace = true }
33+
iceberg = { workspace = true }
34+
once_cell = { workspace = true }
35+
serde = { workspace = true }
36+
serde_derive = { workspace = true }
37+
serde_json = { workspace = true }
38+
zstd = { workspace = true }
39+
40+
[dev-dependencies]
41+
tempfile = { workspace = true }
42+
tokio = { workspace = true }

crates/puffin/src/blob.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use iceberg::io::{FileRead, InputFile};
21+
use iceberg::Result;
22+
23+
use crate::compression::CompressionCodec;
24+
use crate::metadata::BlobMetadata;
25+
26+
/// A serialized form of a "compact" Theta sketch produced by the Apache DataSketches library.
27+
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
28+
29+
/// The blob
30+
#[derive(Debug)]
31+
pub struct Blob {
32+
/// See Blob types: https://iceberg.apache.org/puffin-spec/#blob-types
33+
pub r#type: String,
34+
/// List of field IDs the blob was computed for; the order of items is used to compute sketches stored in the blob.
35+
pub input_fields: Vec<i32>,
36+
/// ID of the Iceberg table's snapshot the blob was computed from
37+
pub snapshot_id: i64,
38+
/// Sequence number of the Iceberg table's snapshot the blob was computed from
39+
pub sequence_number: i64,
40+
/// The actual blob data
41+
pub data: Vec<u8>,
42+
/// The compression codec to use to compress the blob data at write time
43+
pub requested_compression_codec: Option<CompressionCodec>,
44+
/// Arbitrary meta-information about the blob
45+
pub properties: HashMap<String, String>,
46+
}
47+
48+
impl Blob {
49+
/// Returns blob from a Puffin file
50+
pub async fn read(input_file: &InputFile, blob_metadata: &BlobMetadata) -> Result<Blob> {
51+
let file_read = input_file.reader().await?;
52+
let start = blob_metadata.offset;
53+
let end = start + u64::try_from(blob_metadata.length)?;
54+
let bytes = file_read.read(start..end).await?.to_vec();
55+
let data = blob_metadata.compression_codec.decompress(bytes)?;
56+
Ok(Blob {
57+
r#type: blob_metadata.r#type.clone(),
58+
input_fields: blob_metadata.input_fields.clone(),
59+
snapshot_id: blob_metadata.snapshot_id,
60+
sequence_number: blob_metadata.sequence_number,
61+
data,
62+
requested_compression_codec: Some(blob_metadata.compression_codec),
63+
properties: blob_metadata.properties.clone(),
64+
})
65+
}
66+
}
67+
68+
#[cfg(test)]
69+
mod tests {
70+
71+
use crate::test_utils::{
72+
read_blob_as_utf8_string, read_test_file, METRIC_BLOB_0_DATA, METRIC_BLOB_1_DATA,
73+
};
74+
75+
#[tokio::test]
76+
async fn test_read_rust_generated_uncompressed_metric_data() {
77+
let input_file = read_test_file("v1/rust-generated/sample-metric-data-uncompressed.bin");
78+
assert_eq!(
79+
read_blob_as_utf8_string(&input_file, 0).await,
80+
METRIC_BLOB_0_DATA
81+
);
82+
assert_eq!(
83+
read_blob_as_utf8_string(&input_file, 1).await,
84+
METRIC_BLOB_1_DATA
85+
);
86+
}
87+
88+
#[tokio::test]
89+
async fn test_read_rust_generated_ztd_compressed_metric_data() {
90+
let input_file = read_test_file("v1/rust-generated/sample-metric-data-compressed-zstd.bin");
91+
assert_eq!(
92+
read_blob_as_utf8_string(&input_file, 0).await,
93+
METRIC_BLOB_0_DATA
94+
);
95+
assert_eq!(
96+
read_blob_as_utf8_string(&input_file, 1).await,
97+
METRIC_BLOB_1_DATA
98+
);
99+
}
100+
101+
#[tokio::test]
102+
async fn test_read_java_generated_uncompressed_metric_data() {
103+
let input_file = read_test_file("v1/java-generated/sample-metric-data-uncompressed.bin");
104+
assert_eq!(
105+
read_blob_as_utf8_string(&input_file, 0).await,
106+
METRIC_BLOB_0_DATA
107+
);
108+
assert_eq!(
109+
read_blob_as_utf8_string(&input_file, 1).await,
110+
METRIC_BLOB_1_DATA
111+
);
112+
}
113+
114+
#[tokio::test]
115+
async fn test_read_java_generated_ztd_compressed_metric_data() {
116+
let input_file = read_test_file("v1/java-generated/sample-metric-data-compressed-zstd.bin");
117+
assert_eq!(
118+
read_blob_as_utf8_string(&input_file, 0).await,
119+
METRIC_BLOB_0_DATA
120+
);
121+
assert_eq!(
122+
read_blob_as_utf8_string(&input_file, 1).await,
123+
METRIC_BLOB_1_DATA
124+
);
125+
}
126+
}

crates/puffin/src/compression.rs

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
use serde_derive::{Deserialize, Serialize};
20+
21+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
22+
#[serde(rename_all = "lowercase")]
23+
#[derive(Default)]
24+
/// Data compression formats
25+
pub enum CompressionCodec {
26+
/// No compression
27+
#[default]
28+
None,
29+
/// LZ4 single compression frame with content size present
30+
Lz4,
31+
/// Zstandard single compression frame with content size present
32+
Zstd,
33+
}
34+
35+
impl CompressionCodec {
36+
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
37+
match self {
38+
CompressionCodec::None => Ok(bytes),
39+
CompressionCodec::Lz4 => Err(Error::new(
40+
ErrorKind::FeatureUnsupported,
41+
"LZ4 decompression is not supported currently",
42+
)),
43+
CompressionCodec::Zstd => {
44+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
45+
Ok(decompressed)
46+
}
47+
}
48+
}
49+
50+
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
51+
match self {
52+
CompressionCodec::None => Ok(bytes),
53+
CompressionCodec::Lz4 => Err(Error::new(
54+
ErrorKind::FeatureUnsupported,
55+
"LZ4 compression is not supported currently",
56+
)),
57+
CompressionCodec::Zstd => {
58+
let writer = Vec::<u8>::new();
59+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
60+
encoder.include_checksum(true)?;
61+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
62+
std::io::copy(&mut &bytes[..], &mut encoder)?;
63+
let compressed = encoder.finish()?;
64+
Ok(compressed)
65+
}
66+
}
67+
}
68+
69+
pub(crate) fn is_none(&self) -> bool {
70+
matches!(self, CompressionCodec::None)
71+
}
72+
}
73+
74+
#[cfg(test)]
75+
mod tests {
76+
use crate::compression::CompressionCodec;
77+
78+
#[tokio::test]
79+
async fn test_compression_codec_none() {
80+
let compression_codec = CompressionCodec::None;
81+
let bytes_vec = [0_u8; 100].to_vec();
82+
83+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
84+
assert_eq!(bytes_vec, compressed);
85+
86+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
87+
assert_eq!(compressed, decompressed)
88+
}
89+
90+
#[tokio::test]
91+
async fn test_compression_codec_lz4() {
92+
let compression_codec = CompressionCodec::Lz4;
93+
let bytes_vec = [0_u8; 100].to_vec();
94+
95+
assert_eq!(
96+
compression_codec
97+
.compress(bytes_vec.clone())
98+
.unwrap_err()
99+
.to_string(),
100+
"FeatureUnsupported => LZ4 compression is not supported currently",
101+
);
102+
103+
assert_eq!(
104+
compression_codec
105+
.decompress(bytes_vec.clone())
106+
.unwrap_err()
107+
.to_string(),
108+
"FeatureUnsupported => LZ4 decompression is not supported currently",
109+
)
110+
}
111+
112+
#[tokio::test]
113+
async fn test_compression_codec_zstd() {
114+
let compression_codec = CompressionCodec::Zstd;
115+
let bytes_vec = [0_u8; 100].to_vec();
116+
117+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
118+
assert!(compressed.len() < bytes_vec.len());
119+
120+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
121+
assert_eq!(decompressed, bytes_vec)
122+
}
123+
}

0 commit comments

Comments
 (0)