Skip to content

Commit 116aae8

Browse files
committed
Add new task writer and unpartitioned writer
1 parent d3d3127 commit 116aae8

File tree

4 files changed

+733
-0
lines changed

4 files changed

+733
-0
lines changed

crates/iceberg/src/writer/partitioning/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
2424
pub mod clustered_writer;
2525
pub mod fanout_writer;
26+
pub mod unpartitioned_writer;
2627

2728
use crate::Result;
2829
use crate::spec::PartitionKey;
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! This module provides the `UnpartitionedWriter` implementation.
19+
20+
use std::marker::PhantomData;
21+
22+
use crate::Result;
23+
use crate::writer::{DefaultInput, DefaultOutput, IcebergWriter, IcebergWriterBuilder};
24+
25+
/// A simple wrapper around `IcebergWriterBuilder` for unpartitioned tables.
26+
///
27+
/// This writer lazily creates the underlying writer on the first write operation
28+
/// and writes all data to a single file (or set of files if rolling).
29+
///
30+
/// # Type Parameters
31+
///
32+
/// * `B` - The inner writer builder type
33+
/// * `I` - Input type (defaults to `RecordBatch`)
34+
/// * `O` - Output collection type (defaults to `Vec<DataFile>`)
35+
pub struct UnpartitionedWriter<B, I = DefaultInput, O = DefaultOutput>
36+
where
37+
B: IcebergWriterBuilder<I, O>,
38+
O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
39+
<O as IntoIterator>::Item: Clone,
40+
{
41+
inner_builder: B,
42+
writer: Option<B::R>,
43+
output: Vec<<O as IntoIterator>::Item>,
44+
_phantom: PhantomData<I>,
45+
}
46+
47+
impl<B, I, O> UnpartitionedWriter<B, I, O>
48+
where
49+
B: IcebergWriterBuilder<I, O>,
50+
I: Send + 'static,
51+
O: IntoIterator + FromIterator<<O as IntoIterator>::Item>,
52+
<O as IntoIterator>::Item: Send + Clone,
53+
{
54+
/// Create a new `UnpartitionedWriter`.
55+
pub fn new(inner_builder: B) -> Self {
56+
Self {
57+
inner_builder,
58+
writer: None,
59+
output: Vec::new(),
60+
_phantom: PhantomData,
61+
}
62+
}
63+
64+
/// Write data to the writer.
65+
///
66+
/// The underlying writer is lazily created on the first write operation.
67+
///
68+
/// # Parameters
69+
///
70+
/// * `input` - The input data to write
71+
///
72+
/// # Returns
73+
///
74+
/// `Ok(())` on success, or an error if the write operation fails.
75+
pub async fn write(&mut self, input: I) -> Result<()> {
76+
// Lazily create writer on first write
77+
if self.writer.is_none() {
78+
self.writer = Some(self.inner_builder.clone().build(None).await?);
79+
}
80+
81+
// Write directly to inner writer
82+
self.writer
83+
.as_mut()
84+
.expect("Writer should be initialized")
85+
.write(input)
86+
.await
87+
}
88+
89+
/// Close the writer and return all written data files.
90+
///
91+
/// This method consumes the writer to prevent further use.
92+
///
93+
/// # Returns
94+
///
95+
/// The accumulated output from all write operations, or an empty collection
96+
/// if no data was written.
97+
pub async fn close(mut self) -> Result<O> {
98+
if let Some(mut writer) = self.writer.take() {
99+
self.output.extend(writer.close().await?);
100+
}
101+
Ok(O::from_iter(self.output))
102+
}
103+
}
104+
105+
#[cfg(test)]
106+
mod tests {
107+
use std::collections::HashMap;
108+
use std::sync::Arc;
109+
110+
use arrow_array::{Int32Array, RecordBatch, StringArray};
111+
use arrow_schema::{DataType, Field, Schema};
112+
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
113+
use parquet::file::properties::WriterProperties;
114+
use tempfile::TempDir;
115+
116+
use super::*;
117+
use crate::Result;
118+
use crate::io::FileIOBuilder;
119+
use crate::spec::{DataFileFormat, NestedField, PrimitiveType, Struct, Type};
120+
use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder;
121+
use crate::writer::file_writer::ParquetWriterBuilder;
122+
use crate::writer::file_writer::location_generator::{
123+
DefaultFileNameGenerator, DefaultLocationGenerator,
124+
};
125+
use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
126+
127+
#[tokio::test]
128+
async fn test_unpartitioned_writer() -> Result<()> {
129+
let temp_dir = TempDir::new()?;
130+
131+
// Build Iceberg schema
132+
let schema = Arc::new(
133+
crate::spec::Schema::builder()
134+
.with_schema_id(1)
135+
.with_fields(vec![
136+
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
137+
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
138+
])
139+
.build()?,
140+
);
141+
142+
// Build Arrow schema
143+
let arrow_schema = Arc::new(Schema::new(vec![
144+
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
145+
PARQUET_FIELD_ID_META_KEY.to_string(),
146+
"1".to_string(),
147+
)])),
148+
Field::new("name", DataType::Utf8, false).with_metadata(HashMap::from([(
149+
PARQUET_FIELD_ID_META_KEY.to_string(),
150+
"2".to_string(),
151+
)])),
152+
]));
153+
154+
// Build writer
155+
let file_io = FileIOBuilder::new_fs_io().build()?;
156+
let location_gen = DefaultLocationGenerator::with_data_location(
157+
temp_dir.path().to_str().unwrap().to_string(),
158+
);
159+
let file_name_gen =
160+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
161+
let parquet_writer_builder =
162+
ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone());
163+
let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size(
164+
parquet_writer_builder,
165+
file_io,
166+
location_gen,
167+
file_name_gen,
168+
);
169+
let writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
170+
171+
let mut writer = UnpartitionedWriter::new(writer_builder);
172+
173+
// Write two batches
174+
let batch1 = RecordBatch::try_new(arrow_schema.clone(), vec![
175+
Arc::new(Int32Array::from(vec![1, 2])),
176+
Arc::new(StringArray::from(vec!["Alice", "Bob"])),
177+
])?;
178+
let batch2 = RecordBatch::try_new(arrow_schema, vec![
179+
Arc::new(Int32Array::from(vec![3, 4])),
180+
Arc::new(StringArray::from(vec!["Charlie", "Dave"])),
181+
])?;
182+
183+
writer.write(batch1).await?;
184+
writer.write(batch2).await?;
185+
186+
let data_files = writer.close().await?;
187+
188+
// Verify files have empty partition and correct format
189+
assert!(!data_files.is_empty());
190+
for file in &data_files {
191+
assert_eq!(file.partition, Struct::empty());
192+
assert_eq!(file.file_format, DataFileFormat::Parquet);
193+
assert_eq!(file.record_count, 4);
194+
}
195+
196+
Ok(())
197+
}
198+
}

crates/integrations/datafusion/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,6 @@ mod schema;
2626
pub mod table;
2727
pub use table::table_provider_factory::IcebergTableProviderFactory;
2828
pub use table::*;
29+
30+
pub mod task_writer;
31+
pub use task_writer::*;

0 commit comments

Comments
 (0)