Skip to content

Commit b00dd80

Browse files
committed
Add Puffin reader and writer
1 parent f3a571d commit b00dd80

17 files changed

+1707
-8
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ members = [
2424
"crates/integration_tests",
2525
"crates/integrations/*",
2626
"crates/test_utils",
27+
"crates/puffin",
2728
]
2829
exclude = ["bindings/python"]
2930

@@ -98,3 +99,4 @@ uuid = { version = "1.6.1", features = ["v7"] }
9899
volo-thrift = "0.10"
99100
hive_metastore = "0.1"
100101
tera = "1"
102+
zstd = "0.13.2"

crates/iceberg/src/writer/file_writer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ use crate::Result;
2626

2727
mod parquet_writer;
2828
pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
29+
2930
mod track_writer;
31+
pub use track_writer::TrackWriter;
3032

3133
pub mod location_generator;
3234

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! The module contains the file writer for parquet file format.
1919
2020
use std::collections::HashMap;
21-
use std::sync::atomic::AtomicI64;
21+
use std::sync::atomic::AtomicU64;
2222
use std::sync::Arc;
2323

2424
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -81,7 +81,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8181

8282
async fn build(self) -> crate::Result<Self::R> {
8383
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
84-
let written_size = Arc::new(AtomicI64::new(0));
84+
let written_size = Arc::new(AtomicU64::new(0));
8585
let out_file = self.file_io.new_output(
8686
self.location_generator
8787
.generate_location(&self.file_name_generator.generate_file_name()),
@@ -214,7 +214,7 @@ pub struct ParquetWriter {
214214
schema: SchemaRef,
215215
out_file: OutputFile,
216216
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
217-
written_size: Arc<AtomicI64>,
217+
written_size: Arc<AtomicU64>,
218218
current_row_num: usize,
219219
}
220220

crates/iceberg/src/writer/file_writer/track_writer.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::sync::atomic::AtomicI64;
18+
use std::sync::atomic::AtomicU64;
1919
use std::sync::Arc;
2020

2121
use bytes::Bytes;
@@ -24,18 +24,25 @@ use crate::io::FileWrite;
2424
use crate::Result;
2525

2626
/// `TrackWriter` is used to track the written size.
27-
pub(crate) struct TrackWriter {
27+
pub struct TrackWriter {
2828
inner: Box<dyn FileWrite>,
29-
written_size: Arc<AtomicI64>,
29+
/// number of bytes written so far
30+
written_size: Arc<AtomicU64>,
3031
}
3132

3233
impl TrackWriter {
33-
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> Self {
34+
/// Create new writer
35+
pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicU64>) -> Self {
3436
Self {
3537
inner: writer,
3638
written_size,
3739
}
3840
}
41+
42+
/// Number of bytes written so far
43+
pub fn bytes_written(&self) -> u64 {
44+
self.written_size.load(std::sync::atomic::Ordering::SeqCst)
45+
}
3946
}
4047

4148
#[async_trait::async_trait]
@@ -44,7 +51,7 @@ impl FileWrite for TrackWriter {
4451
let size = bs.len();
4552
self.inner.write(bs).await.map(|v| {
4653
self.written_size
47-
.fetch_add(size as i64, std::sync::atomic::Ordering::Relaxed);
54+
.fetch_add(size as u64, std::sync::atomic::Ordering::Relaxed);
4855
v
4956
})
5057
}

crates/puffin/Cargo.toml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-puffin"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
rust-version = { workspace = true }
24+
25+
categories = ["database"]
26+
description = "Apache Iceberg Puffin"
27+
repository = { workspace = true }
28+
license = { workspace = true }
29+
keywords = ["iceberg", "puffin"]
30+
31+
[dependencies]
32+
bytes = { workspace = true }
33+
iceberg = { workspace = true }
34+
serde = { workspace = true }
35+
serde_derive = { workspace = true }
36+
serde_json = { workspace = true }
37+
once_cell = { workspace = true }
38+
zstd = { workspace = true }
39+
40+
[dev-dependencies]
41+
tempfile = { workspace = true }
42+
tokio = { workspace = true }

crates/puffin/src/blob.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use iceberg::io::{FileRead, InputFile};
21+
use iceberg::Result;
22+
23+
use crate::compression::CompressionCodec;
24+
use crate::metadata::BlobMetadata;
25+
26+
#[allow(dead_code)]
27+
pub const APACHE_DATASKETCHES_THETA_V1: &str = "apache-datasketches-theta-v1";
28+
29+
#[derive(Debug)]
30+
pub struct Blob {
31+
pub r#type: String,
32+
pub input_fields: Vec<i32>,
33+
pub snapshot_id: i64,
34+
pub sequence_number: i64,
35+
pub data: Vec<u8>,
36+
pub requested_compression_codec: Option<CompressionCodec>,
37+
pub properties: HashMap<String, String>,
38+
}
39+
40+
impl Blob {
41+
/// Reads blob from a Puffin file
42+
pub async fn read(input_file: &InputFile, blob_metadata: &BlobMetadata) -> Result<Blob> {
43+
let file_read = input_file.reader().await?;
44+
let start = blob_metadata.offset;
45+
let end = start + u64::try_from(blob_metadata.length)?;
46+
let bytes = file_read.read(start..end).await?.to_vec();
47+
let data = blob_metadata.compression_codec.decompress(bytes)?;
48+
Ok(Blob {
49+
r#type: blob_metadata.r#type.clone(),
50+
input_fields: blob_metadata.input_fields.clone(),
51+
snapshot_id: blob_metadata.snapshot_id,
52+
sequence_number: blob_metadata.sequence_number,
53+
data,
54+
requested_compression_codec: Some(blob_metadata.compression_codec),
55+
properties: blob_metadata.properties.clone(),
56+
})
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
63+
use crate::test_utils::{
64+
read_blob_as_utf8_string, read_test_file, METRIC_BLOB_0_DATA, METRIC_BLOB_1_DATA,
65+
};
66+
67+
#[tokio::test]
68+
async fn test_read_rust_generated_uncompressed_metric_data() {
69+
let input_file = read_test_file("v1/rust-generated/sample-metric-data-uncompressed.bin");
70+
assert_eq!(
71+
read_blob_as_utf8_string(&input_file, 0).await,
72+
METRIC_BLOB_0_DATA
73+
);
74+
assert_eq!(
75+
read_blob_as_utf8_string(&input_file, 1).await,
76+
METRIC_BLOB_1_DATA
77+
);
78+
}
79+
80+
#[tokio::test]
81+
async fn test_read_rust_generated_ztd_compressed_metric_data() {
82+
let input_file = read_test_file("v1/rust-generated/sample-metric-data-compressed-zstd.bin");
83+
assert_eq!(
84+
read_blob_as_utf8_string(&input_file, 0).await,
85+
METRIC_BLOB_0_DATA
86+
);
87+
assert_eq!(
88+
read_blob_as_utf8_string(&input_file, 1).await,
89+
METRIC_BLOB_1_DATA
90+
);
91+
}
92+
93+
#[tokio::test]
94+
async fn test_read_java_generated_uncompressed_metric_data() {
95+
let input_file = read_test_file("v1/java-generated/sample-metric-data-uncompressed.bin");
96+
assert_eq!(
97+
read_blob_as_utf8_string(&input_file, 0).await,
98+
METRIC_BLOB_0_DATA
99+
);
100+
assert_eq!(
101+
read_blob_as_utf8_string(&input_file, 1).await,
102+
METRIC_BLOB_1_DATA
103+
);
104+
}
105+
106+
#[tokio::test]
107+
async fn test_read_java_generated_ztd_compressed_metric_data() {
108+
let input_file = read_test_file("v1/java-generated/sample-metric-data-compressed-zstd.bin");
109+
assert_eq!(
110+
read_blob_as_utf8_string(&input_file, 0).await,
111+
METRIC_BLOB_0_DATA
112+
);
113+
assert_eq!(
114+
read_blob_as_utf8_string(&input_file, 1).await,
115+
METRIC_BLOB_1_DATA
116+
);
117+
}
118+
}

crates/puffin/src/compression.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use iceberg::{Error, ErrorKind, Result};
19+
use serde_derive::{Deserialize, Serialize};
20+
21+
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy)]
22+
#[serde(rename_all = "lowercase")]
23+
#[derive(Default)]
24+
pub enum CompressionCodec {
25+
/** No compression */
26+
#[default]
27+
None,
28+
/** LZ4 single compression frame with content size present */
29+
Lz4,
30+
/** Zstandard single compression frame with content size present */
31+
Zstd,
32+
}
33+
34+
impl CompressionCodec {
35+
pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
36+
match self {
37+
CompressionCodec::None => Ok(bytes),
38+
CompressionCodec::Lz4 => Err(Error::new(
39+
ErrorKind::FeatureUnsupported,
40+
"LZ4 decompression is not supported currently",
41+
)),
42+
CompressionCodec::Zstd => {
43+
let decompressed = zstd::stream::decode_all(&bytes[..])?;
44+
Ok(decompressed)
45+
}
46+
}
47+
}
48+
49+
pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> {
50+
match self {
51+
CompressionCodec::None => Ok(bytes),
52+
CompressionCodec::Lz4 => Err(Error::new(
53+
ErrorKind::FeatureUnsupported,
54+
"LZ4 compression is not supported currently",
55+
)),
56+
CompressionCodec::Zstd => {
57+
let writer = Vec::<u8>::new();
58+
let mut encoder = zstd::stream::Encoder::new(writer, 3)?;
59+
encoder.include_checksum(true)?;
60+
encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?;
61+
std::io::copy(&mut &bytes[..], &mut encoder)?;
62+
let compressed = encoder.finish()?;
63+
Ok(compressed)
64+
}
65+
}
66+
}
67+
68+
pub(crate) fn is_none(&self) -> bool {
69+
matches!(self, CompressionCodec::None)
70+
}
71+
}
72+
73+
#[cfg(test)]
74+
mod tests {
75+
use crate::compression::CompressionCodec;
76+
77+
#[tokio::test]
78+
async fn test_compression_codec_none() {
79+
let compression_codec = CompressionCodec::None;
80+
let bytes_vec = [0_u8; 100].to_vec();
81+
82+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
83+
assert_eq!(bytes_vec, compressed);
84+
85+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
86+
assert_eq!(compressed, decompressed)
87+
}
88+
89+
#[tokio::test]
90+
async fn test_compression_codec_lz4() {
91+
let compression_codec = CompressionCodec::Lz4;
92+
let bytes_vec = [0_u8; 100].to_vec();
93+
94+
assert_eq!(
95+
compression_codec
96+
.compress(bytes_vec.clone())
97+
.unwrap_err()
98+
.to_string(),
99+
"FeatureUnsupported => LZ4 compression is not supported currently",
100+
);
101+
102+
assert_eq!(
103+
compression_codec
104+
.decompress(bytes_vec.clone())
105+
.unwrap_err()
106+
.to_string(),
107+
"FeatureUnsupported => LZ4 decompression is not supported currently",
108+
)
109+
}
110+
111+
#[tokio::test]
112+
async fn test_compression_codec_zstd() {
113+
let compression_codec = CompressionCodec::Zstd;
114+
let bytes_vec = [0_u8; 100].to_vec();
115+
116+
let compressed = compression_codec.compress(bytes_vec.clone()).unwrap();
117+
assert!(compressed.len() < bytes_vec.len());
118+
119+
let decompressed = compression_codec.decompress(compressed.clone()).unwrap();
120+
assert_eq!(decompressed, bytes_vec)
121+
}
122+
}

0 commit comments

Comments
 (0)