Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 7d1d1ac

Browse files
ZENOTMEshaeqahmed
authored andcommittedDec 9, 2024
feat: Add integration test and support append DataFile (apache#349)
* support append data file and add e2e test * fix typos * refine append action * fix cargo sort * add consistent check for partition value * generate unique snapshot id * avoid to set snapshot id for v2 * refine test * fix unit test * export ports * fix None case for parant_snapshot_id * fix parquect schema check * refactor append action of transaction * refine * refine e2e test * refine commit uuid * fix file format field to uppercase in manifest * refine SnapshotProduceAction * rename e2e_test to integration_tests * fix * use docker-compose.yaml from rest catalog * fix check --------- Co-authored-by: ZENOTME <st810918843@gmail.com>
1 parent bf802dc commit 7d1d1ac

File tree

17 files changed

+1130
-53
lines changed

17 files changed

+1130
-53
lines changed
 

‎Cargo.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
[workspace]
1919
resolver = "2"
2020
members = [
21-
"crates/catalog/*",
22-
"crates/examples",
23-
"crates/iceberg",
24-
"crates/integrations/*",
25-
"crates/test_utils",
21+
"crates/catalog/*",
22+
"crates/examples",
23+
"crates/iceberg",
24+
"crates/integration_tests",
25+
"crates/integrations/*",
26+
"crates/test_utils",
2627
]
2728
exclude = ["bindings/python"]
2829

‎crates/catalog/rest/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,7 @@ mod tests {
13761376
.with_schema_id(0)
13771377
.with_summary(Summary {
13781378
operation: Operation::Append,
1379-
other: HashMap::from_iter([
1379+
additional_properties: HashMap::from_iter([
13801380
("spark.app.id", "local-1646787004168"),
13811381
("added-data-files", "1"),
13821382
("added-records", "1"),

‎crates/iceberg/src/catalog/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ mod tests {
14231423
.with_schema_id(1)
14241424
.with_summary(Summary {
14251425
operation: Operation::Append,
1426-
other: HashMap::default(),
1426+
additional_properties: HashMap::default(),
14271427
})
14281428
.build(),
14291429
};
@@ -1457,7 +1457,7 @@ mod tests {
14571457
.with_manifest_list("s3://a/b/2.avro")
14581458
.with_summary(Summary {
14591459
operation: Operation::Append,
1460-
other: HashMap::default(),
1460+
additional_properties: HashMap::default(),
14611461
})
14621462
.build(),
14631463
};

‎crates/iceberg/src/io/object_cache.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ mod tests {
185185
use crate::spec::{
186186
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
187187
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
188-
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
188+
ManifestWriter, Struct, TableMetadata,
189189
};
190190
use crate::table::Table;
191191
use crate::TableIdent;
@@ -293,9 +293,7 @@ mod tests {
293293
.new_output(current_snapshot.manifest_list())
294294
.unwrap(),
295295
current_snapshot.snapshot_id(),
296-
current_snapshot
297-
.parent_snapshot_id()
298-
.unwrap_or(EMPTY_SNAPSHOT_ID),
296+
current_snapshot.parent_snapshot_id(),
299297
current_snapshot.sequence_number(),
300298
);
301299
manifest_list_write

‎crates/iceberg/src/scan.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -977,7 +977,6 @@ mod tests {
977977
DataContentType, DataFileBuilder, DataFileFormat, Datum, FormatVersion, Literal, Manifest,
978978
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
979979
ManifestWriter, NestedField, PrimitiveType, Schema, Struct, TableMetadata, Type,
980-
EMPTY_SNAPSHOT_ID,
981980
};
982981
use crate::table::Table;
983982
use crate::TableIdent;
@@ -1124,9 +1123,7 @@ mod tests {
11241123
.new_output(current_snapshot.manifest_list())
11251124
.unwrap(),
11261125
current_snapshot.snapshot_id(),
1127-
current_snapshot
1128-
.parent_snapshot_id()
1129-
.unwrap_or(EMPTY_SNAPSHOT_ID),
1126+
current_snapshot.parent_snapshot_id(),
11301127
current_snapshot.sequence_number(),
11311128
);
11321129
manifest_list_write

‎crates/iceberg/src/spec/manifest.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,12 @@ impl ManifestEntry {
900900
}
901901
}
902902

903+
/// Snapshot id
904+
#[inline]
905+
pub fn snapshot_id(&self) -> Option<i64> {
906+
self.snapshot_id
907+
}
908+
903909
/// Data sequence number.
904910
#[inline]
905911
pub fn sequence_number(&self) -> Option<i64> {
@@ -1328,7 +1334,7 @@ mod _serde {
13281334
Ok(Self {
13291335
content: value.content as i32,
13301336
file_path: value.file_path,
1331-
file_format: value.file_format.to_string(),
1337+
file_format: value.file_format.to_string().to_ascii_uppercase(),
13321338
partition: RawLiteral::try_from(
13331339
Literal::Struct(value.partition),
13341340
&Type::Struct(partition_type.clone()),

‎crates/iceberg/src/spec/manifest_list.rs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,34 +106,38 @@ impl std::fmt::Debug for ManifestListWriter {
106106

107107
impl ManifestListWriter {
108108
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
109-
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
110-
let metadata = HashMap::from_iter([
109+
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
110+
let mut metadata = HashMap::from_iter([
111111
("snapshot-id".to_string(), snapshot_id.to_string()),
112-
(
113-
"parent-snapshot-id".to_string(),
114-
parent_snapshot_id.to_string(),
115-
),
116112
("format-version".to_string(), "1".to_string()),
117113
]);
114+
if let Some(parent_snapshot_id) = parent_snapshot_id {
115+
metadata.insert(
116+
"parent-snapshot-id".to_string(),
117+
parent_snapshot_id.to_string(),
118+
);
119+
}
118120
Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
119121
}
120122

121123
/// Construct a v2 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
122124
pub fn v2(
123125
output_file: OutputFile,
124126
snapshot_id: i64,
125-
parent_snapshot_id: i64,
127+
parent_snapshot_id: Option<i64>,
126128
sequence_number: i64,
127129
) -> Self {
128-
let metadata = HashMap::from_iter([
130+
let mut metadata = HashMap::from_iter([
129131
("snapshot-id".to_string(), snapshot_id.to_string()),
130-
(
131-
"parent-snapshot-id".to_string(),
132-
parent_snapshot_id.to_string(),
133-
),
134132
("sequence-number".to_string(), sequence_number.to_string()),
135133
("format-version".to_string(), "2".to_string()),
136134
]);
135+
metadata.insert(
136+
"parent-snapshot-id".to_string(),
137+
parent_snapshot_id
138+
.map(|v| v.to_string())
139+
.unwrap_or("null".to_string()),
140+
);
137141
Self::new(
138142
FormatVersion::V2,
139143
output_file,
@@ -580,6 +584,18 @@ pub struct ManifestFile {
580584
pub key_metadata: Vec<u8>,
581585
}
582586

587+
impl ManifestFile {
588+
/// Checks if the manifest file has any added files.
589+
pub fn has_added_files(&self) -> bool {
590+
self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
591+
}
592+
593+
/// Checks if the manifest file has any existed files.
594+
pub fn has_existing_files(&self) -> bool {
595+
self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
596+
}
597+
}
598+
583599
/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
584600
#[derive(Debug, PartialEq, Clone, Eq)]
585601
pub enum ManifestContentType {
@@ -1146,7 +1162,7 @@ mod test {
11461162
let mut writer = ManifestListWriter::v1(
11471163
file_io.new_output(full_path.clone()).unwrap(),
11481164
1646658105718557341,
1149-
1646658105718557341,
1165+
Some(1646658105718557341),
11501166
);
11511167

11521168
writer
@@ -1213,7 +1229,7 @@ mod test {
12131229
let mut writer = ManifestListWriter::v2(
12141230
file_io.new_output(full_path.clone()).unwrap(),
12151231
1646658105718557341,
1216-
1646658105718557341,
1232+
Some(1646658105718557341),
12171233
1,
12181234
);
12191235

@@ -1335,7 +1351,7 @@ mod test {
13351351
let io = FileIOBuilder::new_fs_io().build().unwrap();
13361352
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
13371353

1338-
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
1354+
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
13391355
writer
13401356
.add_manifests(expected_manifest_list.entries.clone().into_iter())
13411357
.unwrap();
@@ -1391,7 +1407,7 @@ mod test {
13911407
let io = FileIOBuilder::new_fs_io().build().unwrap();
13921408
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
13931409

1394-
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, 0, seq_num);
1410+
let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num);
13951411
writer
13961412
.add_manifests(expected_manifest_list.entries.clone().into_iter())
13971413
.unwrap();
@@ -1445,7 +1461,7 @@ mod test {
14451461
let io = FileIOBuilder::new_fs_io().build().unwrap();
14461462
let output_file = io.new_output(path.to_str().unwrap()).unwrap();
14471463

1448-
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, 0, 1);
1464+
let mut writer = ManifestListWriter::v2(output_file, 1646658105718557341, Some(0), 1);
14491465
writer
14501466
.add_manifests(expected_manifest_list.entries.clone().into_iter())
14511467
.unwrap();

‎crates/iceberg/src/spec/snapshot.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct Summary {
5959
pub operation: Operation,
6060
/// Other summary data.
6161
#[serde(flatten)]
62-
pub other: HashMap<String, String>,
62+
pub additional_properties: HashMap<String, String>,
6363
}
6464

6565
impl Default for Operation {
@@ -291,7 +291,7 @@ pub(super) mod _serde {
291291
},
292292
summary: v1.summary.unwrap_or(Summary {
293293
operation: Operation::default(),
294-
other: HashMap::new(),
294+
additional_properties: HashMap::new(),
295295
}),
296296
schema_id: v1.schema_id,
297297
})
@@ -372,6 +372,21 @@ pub enum SnapshotRetention {
372372
},
373373
}
374374

375+
impl SnapshotRetention {
376+
/// Create a new branch retention policy
377+
pub fn branch(
378+
min_snapshots_to_keep: Option<i32>,
379+
max_snapshot_age_ms: Option<i64>,
380+
max_ref_age_ms: Option<i64>,
381+
) -> Self {
382+
SnapshotRetention::Branch {
383+
min_snapshots_to_keep,
384+
max_snapshot_age_ms,
385+
max_ref_age_ms,
386+
}
387+
}
388+
}
389+
375390
#[cfg(test)]
376391
mod tests {
377392
use std::collections::HashMap;
@@ -408,7 +423,7 @@ mod tests {
408423
assert_eq!(
409424
Summary {
410425
operation: Operation::Append,
411-
other: HashMap::new()
426+
additional_properties: HashMap::new()
412427
},
413428
*result.summary()
414429
);

‎crates/iceberg/src/spec/table_metadata.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,18 @@ impl TableMetadata {
201201
self.last_sequence_number
202202
}
203203

204+
/// Returns the next sequence number for the table.
205+
///
206+
/// For format version 1, it always returns the initial sequence number.
207+
/// For other versions, it returns the last sequence number incremented by 1.
208+
#[inline]
209+
pub fn next_sequence_number(&self) -> i64 {
210+
match self.format_version {
211+
FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
212+
_ => self.last_sequence_number + 1,
213+
}
214+
}
215+
204216
/// Returns last updated time.
205217
#[inline]
206218
pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
@@ -1476,7 +1488,7 @@ mod tests {
14761488
.with_sequence_number(0)
14771489
.with_schema_id(0)
14781490
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
1479-
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
1491+
.with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
14801492
.build();
14811493

14821494
let expected = TableMetadata {
@@ -1895,7 +1907,7 @@ mod tests {
18951907
.with_manifest_list("s3://a/b/1.avro")
18961908
.with_summary(Summary {
18971909
operation: Operation::Append,
1898-
other: HashMap::new(),
1910+
additional_properties: HashMap::new(),
18991911
})
19001912
.build();
19011913

@@ -1908,7 +1920,7 @@ mod tests {
19081920
.with_manifest_list("s3://a/b/2.avro")
19091921
.with_summary(Summary {
19101922
operation: Operation::Append,
1911-
other: HashMap::new(),
1923+
additional_properties: HashMap::new(),
19121924
})
19131925
.build();
19141926

‎crates/iceberg/src/spec/table_metadata_builder.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1818,7 +1818,7 @@ mod tests {
18181818
.with_manifest_list("/snap-1.avro")
18191819
.with_summary(Summary {
18201820
operation: Operation::Append,
1821-
other: HashMap::from_iter(vec![
1821+
additional_properties: HashMap::from_iter(vec![
18221822
(
18231823
"spark.app.id".to_string(),
18241824
"local-1662532784305".to_string(),
@@ -1881,7 +1881,7 @@ mod tests {
18811881
.with_manifest_list("/snap-1.avro")
18821882
.with_summary(Summary {
18831883
operation: Operation::Append,
1884-
other: HashMap::from_iter(vec![
1884+
additional_properties: HashMap::from_iter(vec![
18851885
(
18861886
"spark.app.id".to_string(),
18871887
"local-1662532784305".to_string(),
@@ -1901,7 +1901,7 @@ mod tests {
19011901
.with_manifest_list("/snap-1.avro")
19021902
.with_summary(Summary {
19031903
operation: Operation::Append,
1904-
other: HashMap::from_iter(vec![
1904+
additional_properties: HashMap::from_iter(vec![
19051905
(
19061906
"spark.app.id".to_string(),
19071907
"local-1662532784305".to_string(),
@@ -1949,7 +1949,7 @@ mod tests {
19491949
.with_manifest_list("/snap-1.avro")
19501950
.with_summary(Summary {
19511951
operation: Operation::Append,
1952-
other: HashMap::new(),
1952+
additional_properties: HashMap::new(),
19531953
})
19541954
.build();
19551955

@@ -1994,7 +1994,7 @@ mod tests {
19941994
.with_manifest_list("/snap-1.avro")
19951995
.with_summary(Summary {
19961996
operation: Operation::Append,
1997-
other: HashMap::from_iter(vec![
1997+
additional_properties: HashMap::from_iter(vec![
19981998
(
19991999
"spark.app.id".to_string(),
20002000
"local-1662532784305".to_string(),
@@ -2114,7 +2114,7 @@ mod tests {
21142114
.with_manifest_list("/snap-1")
21152115
.with_summary(Summary {
21162116
operation: Operation::Append,
2117-
other: HashMap::new(),
2117+
additional_properties: HashMap::new(),
21182118
})
21192119
.build();
21202120

@@ -2140,7 +2140,7 @@ mod tests {
21402140
.with_parent_snapshot_id(Some(1))
21412141
.with_summary(Summary {
21422142
operation: Operation::Append,
2143-
other: HashMap::new(),
2143+
additional_properties: HashMap::new(),
21442144
})
21452145
.build();
21462146

‎crates/iceberg/src/spec/values.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1537,6 +1537,14 @@ impl Literal {
15371537
})?;
15381538
Ok(Self::decimal(decimal.mantissa()))
15391539
}
1540+
1541+
/// Attempts to convert the Literal to a PrimitiveLiteral
1542+
pub fn as_primitive_literal(&self) -> Option<PrimitiveLiteral> {
1543+
match self {
1544+
Literal::Primitive(primitive) => Some(primitive.clone()),
1545+
_ => None,
1546+
}
1547+
}
15401548
}
15411549

15421550
/// The partition struct stores the tuple of partition values for each file.
@@ -1576,6 +1584,11 @@ impl Struct {
15761584
pub fn is_null_at_index(&self, index: usize) -> bool {
15771585
self.null_bitmap[index]
15781586
}
1587+
1588+
/// Return fields in the struct.
1589+
pub fn fields(&self) -> &[Literal] {
1590+
&self.fields
1591+
}
15791592
}
15801593

15811594
impl Index<usize> for Struct {

‎crates/iceberg/src/transaction.rs

Lines changed: 519 additions & 6 deletions
Large diffs are not rendered by default.

‎crates/integration_tests/Cargo.toml

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
[package]
19+
name = "iceberg-integration-tests"
20+
version = { workspace = true }
21+
edition = { workspace = true }
22+
homepage = { workspace = true }
23+
repository = { workspace = true }
24+
license = { workspace = true }
25+
rust-version = { workspace = true }
26+
27+
[dependencies]
28+
arrow-array = { workspace = true }
29+
arrow-schema = { workspace = true }
30+
futures = { workspace = true }
31+
iceberg = { workspace = true }
32+
iceberg-catalog-rest = { workspace = true }
33+
iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
34+
log = { workspace = true }
35+
parquet = { workspace = true }
36+
port_scanner = { workspace = true }
37+
tokio = { workspace = true }

‎crates/integration_tests/src/lib.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashMap;
19+
20+
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
21+
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
22+
use iceberg_test_utils::docker::DockerCompose;
23+
use iceberg_test_utils::{normalize_test_name, set_up};
24+
use port_scanner::scan_port_addr;
25+
use tokio::time::sleep;
26+
27+
const REST_CATALOG_PORT: u16 = 8181;
28+
29+
pub struct TestFixture {
30+
pub _docker_compose: DockerCompose,
31+
pub rest_catalog: RestCatalog,
32+
}
33+
34+
pub async fn set_test_fixture(func: &str) -> TestFixture {
35+
set_up();
36+
let docker_compose = DockerCompose::new(
37+
normalize_test_name(format!("{}_{func}", module_path!())),
38+
format!("{}/testdata", env!("CARGO_MANIFEST_DIR")),
39+
);
40+
41+
// Start docker compose
42+
docker_compose.run();
43+
44+
let rest_catalog_ip = docker_compose.get_container_ip("rest");
45+
46+
let read_port = format!("{}:{}", rest_catalog_ip, REST_CATALOG_PORT);
47+
loop {
48+
if !scan_port_addr(&read_port) {
49+
log::info!("Waiting for 1s rest catalog to ready...");
50+
sleep(std::time::Duration::from_millis(1000)).await;
51+
} else {
52+
break;
53+
}
54+
}
55+
56+
let container_ip = docker_compose.get_container_ip("minio");
57+
let read_port = format!("{}:{}", container_ip, 9000);
58+
59+
let config = RestCatalogConfig::builder()
60+
.uri(format!("http://{}:{}", rest_catalog_ip, REST_CATALOG_PORT))
61+
.props(HashMap::from([
62+
(S3_ENDPOINT.to_string(), format!("http://{}", read_port)),
63+
(S3_ACCESS_KEY_ID.to_string(), "admin".to_string()),
64+
(S3_SECRET_ACCESS_KEY.to_string(), "password".to_string()),
65+
(S3_REGION.to_string(), "us-east-1".to_string()),
66+
]))
67+
.build();
68+
let rest_catalog = RestCatalog::new(config);
69+
70+
TestFixture {
71+
_docker_compose: docker_compose,
72+
rest_catalog,
73+
}
74+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
networks:
19+
rest_bridge:
20+
21+
services:
22+
rest:
23+
image: tabulario/iceberg-rest:0.10.0
24+
environment:
25+
- AWS_ACCESS_KEY_ID=admin
26+
- AWS_SECRET_ACCESS_KEY=password
27+
- AWS_REGION=us-east-1
28+
- CATALOG_CATOLOG__IMPL=org.apache.iceberg.jdbc.JdbcCatalog
29+
- CATALOG_URI=jdbc:sqlite:file:/tmp/iceberg_rest_mode=memory
30+
- CATALOG_WAREHOUSE=s3://icebergdata/demo
31+
- CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
32+
- CATALOG_S3_ENDPOINT=http://minio:9000
33+
depends_on:
34+
- minio
35+
networks:
36+
rest_bridge:
37+
aliases:
38+
- icebergdata.minio
39+
ports:
40+
- 8181:8181
41+
expose:
42+
- 8181
43+
44+
minio:
45+
image: minio/minio:RELEASE.2024-03-07T00-43-48Z
46+
environment:
47+
- MINIO_ROOT_USER=admin
48+
- MINIO_ROOT_PASSWORD=password
49+
- MINIO_DOMAIN=minio
50+
- MINIO_DEFAULT_BUCKETS=icebergdata
51+
hostname: icebergdata.minio
52+
networks:
53+
rest_bridge:
54+
ports:
55+
- 9001:9001
56+
expose:
57+
- 9001
58+
- 9000
59+
command: ["server", "/data", "--console-address", ":9001"]
60+
61+
mc:
62+
depends_on:
63+
- minio
64+
image: minio/mc:RELEASE.2024-03-07T00-31-49Z
65+
environment:
66+
- AWS_ACCESS_KEY_ID=admin
67+
- AWS_SECRET_ACCESS_KEY=password
68+
- AWS_REGION=us-east-1
69+
entrypoint: >
70+
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 admin password) do echo '...waiting...' && sleep 1; done; /usr/bin/mc rm -r --force minio/icebergdata; /usr/bin/mc mb minio/icebergdata; /usr/bin/mc policy set public minio/icebergdata; tail -f /dev/null "
71+
networks:
72+
rest_bridge:
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Integration tests for rest catalog.
19+
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
23+
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
24+
use futures::TryStreamExt;
25+
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
26+
use iceberg::transaction::Transaction;
27+
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
28+
use iceberg::writer::file_writer::location_generator::{
29+
DefaultFileNameGenerator, DefaultLocationGenerator,
30+
};
31+
use iceberg::writer::file_writer::ParquetWriterBuilder;
32+
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
33+
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
34+
use iceberg_integration_tests::set_test_fixture;
35+
use parquet::arrow::arrow_reader::ArrowReaderOptions;
36+
use parquet::file::properties::WriterProperties;
37+
38+
#[tokio::test]
39+
async fn test_append_data_file() {
40+
let fixture = set_test_fixture("test_create_table").await;
41+
42+
let ns = Namespace::with_properties(
43+
NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
44+
HashMap::from([
45+
("owner".to_string(), "ray".to_string()),
46+
("community".to_string(), "apache".to_string()),
47+
]),
48+
);
49+
50+
fixture
51+
.rest_catalog
52+
.create_namespace(ns.name(), ns.properties().clone())
53+
.await
54+
.unwrap();
55+
56+
let schema = Schema::builder()
57+
.with_schema_id(1)
58+
.with_identifier_field_ids(vec![2])
59+
.with_fields(vec![
60+
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
61+
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
62+
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
63+
])
64+
.build()
65+
.unwrap();
66+
67+
let table_creation = TableCreation::builder()
68+
.name("t1".to_string())
69+
.schema(schema.clone())
70+
.build();
71+
72+
let table = fixture
73+
.rest_catalog
74+
.create_table(ns.name(), table_creation)
75+
.await
76+
.unwrap();
77+
78+
// Create the writer and write the data
79+
let schema: Arc<arrow_schema::Schema> = Arc::new(
80+
table
81+
.metadata()
82+
.current_schema()
83+
.as_ref()
84+
.try_into()
85+
.unwrap(),
86+
);
87+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
88+
let file_name_generator = DefaultFileNameGenerator::new(
89+
"test".to_string(),
90+
None,
91+
iceberg::spec::DataFileFormat::Parquet,
92+
);
93+
let parquet_writer_builder = ParquetWriterBuilder::new(
94+
WriterProperties::default(),
95+
table.metadata().current_schema().clone(),
96+
table.file_io().clone(),
97+
location_generator.clone(),
98+
file_name_generator.clone(),
99+
);
100+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
101+
let mut data_file_writer = data_file_writer_builder
102+
.build(DataFileWriterConfig::new(None))
103+
.await
104+
.unwrap();
105+
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
106+
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
107+
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
108+
let batch = RecordBatch::try_new(schema.clone(), vec![
109+
Arc::new(col1) as ArrayRef,
110+
Arc::new(col2) as ArrayRef,
111+
Arc::new(col3) as ArrayRef,
112+
])
113+
.unwrap();
114+
data_file_writer.write(batch.clone()).await.unwrap();
115+
let data_file = data_file_writer.close().await.unwrap();
116+
117+
// check parquet file schema
118+
let content = table
119+
.file_io()
120+
.new_input(data_file[0].file_path())
121+
.unwrap()
122+
.read()
123+
.await
124+
.unwrap();
125+
let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load(
126+
&content,
127+
ArrowReaderOptions::default(),
128+
)
129+
.unwrap();
130+
let field_ids: Vec<i32> = parquet_reader
131+
.parquet_schema()
132+
.columns()
133+
.iter()
134+
.map(|col| col.self_type().get_basic_info().id())
135+
.collect();
136+
assert_eq!(field_ids, vec![1, 2, 3]);
137+
138+
// commit result
139+
let tx = Transaction::new(&table);
140+
let mut append_action = tx.fast_append(None, vec![]).unwrap();
141+
append_action.add_data_files(data_file.clone()).unwrap();
142+
let tx = append_action.apply().await.unwrap();
143+
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
144+
145+
// check result
146+
let batch_stream = table
147+
.scan()
148+
.select_all()
149+
.build()
150+
.unwrap()
151+
.to_arrow()
152+
.await
153+
.unwrap();
154+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
155+
assert_eq!(batches.len(), 1);
156+
assert_eq!(batches[0], batch);
157+
158+
// commit result again
159+
let tx = Transaction::new(&table);
160+
let mut append_action = tx.fast_append(None, vec![]).unwrap();
161+
append_action.add_data_files(data_file.clone()).unwrap();
162+
let tx = append_action.apply().await.unwrap();
163+
let table = tx.commit(&fixture.rest_catalog).await.unwrap();
164+
165+
// check result again
166+
let batch_stream = table
167+
.scan()
168+
.select_all()
169+
.build()
170+
.unwrap()
171+
.to_arrow()
172+
.await
173+
.unwrap();
174+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
175+
assert_eq!(batches.len(), 2);
176+
assert_eq!(batches[0], batch);
177+
assert_eq!(batches[1], batch);
178+
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Integration tests for rest catalog.
19+
20+
use std::collections::HashMap;
21+
use std::sync::Arc;
22+
23+
use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
24+
use futures::TryStreamExt;
25+
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
26+
use iceberg::transaction::Transaction;
27+
use iceberg::writer::base_writer::data_file_writer::{DataFileWriterBuilder, DataFileWriterConfig};
28+
use iceberg::writer::file_writer::location_generator::{
29+
DefaultFileNameGenerator, DefaultLocationGenerator,
30+
};
31+
use iceberg::writer::file_writer::ParquetWriterBuilder;
32+
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
33+
use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation};
34+
use iceberg_integration_tests::set_test_fixture;
35+
use parquet::file::properties::WriterProperties;
36+
37+
#[tokio::test]
38+
async fn test_append_data_file_conflict() {
39+
let fixture = set_test_fixture("test_create_table").await;
40+
41+
let ns = Namespace::with_properties(
42+
NamespaceIdent::from_strs(["apple", "ios"]).unwrap(),
43+
HashMap::from([
44+
("owner".to_string(), "ray".to_string()),
45+
("community".to_string(), "apache".to_string()),
46+
]),
47+
);
48+
49+
fixture
50+
.rest_catalog
51+
.create_namespace(ns.name(), ns.properties().clone())
52+
.await
53+
.unwrap();
54+
55+
let schema = Schema::builder()
56+
.with_schema_id(1)
57+
.with_identifier_field_ids(vec![2])
58+
.with_fields(vec![
59+
NestedField::optional(1, "foo", Type::Primitive(PrimitiveType::String)).into(),
60+
NestedField::required(2, "bar", Type::Primitive(PrimitiveType::Int)).into(),
61+
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
62+
])
63+
.build()
64+
.unwrap();
65+
66+
let table_creation = TableCreation::builder()
67+
.name("t1".to_string())
68+
.schema(schema.clone())
69+
.build();
70+
71+
let table = fixture
72+
.rest_catalog
73+
.create_table(ns.name(), table_creation)
74+
.await
75+
.unwrap();
76+
77+
// Create the writer and write the data
78+
let schema: Arc<arrow_schema::Schema> = Arc::new(
79+
table
80+
.metadata()
81+
.current_schema()
82+
.as_ref()
83+
.try_into()
84+
.unwrap(),
85+
);
86+
let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap();
87+
let file_name_generator = DefaultFileNameGenerator::new(
88+
"test".to_string(),
89+
None,
90+
iceberg::spec::DataFileFormat::Parquet,
91+
);
92+
let parquet_writer_builder = ParquetWriterBuilder::new(
93+
WriterProperties::default(),
94+
table.metadata().current_schema().clone(),
95+
table.file_io().clone(),
96+
location_generator.clone(),
97+
file_name_generator.clone(),
98+
);
99+
let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder);
100+
let mut data_file_writer = data_file_writer_builder
101+
.build(DataFileWriterConfig::new(None))
102+
.await
103+
.unwrap();
104+
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
105+
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
106+
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
107+
let batch = RecordBatch::try_new(schema.clone(), vec![
108+
Arc::new(col1) as ArrayRef,
109+
Arc::new(col2) as ArrayRef,
110+
Arc::new(col3) as ArrayRef,
111+
])
112+
.unwrap();
113+
data_file_writer.write(batch.clone()).await.unwrap();
114+
let data_file = data_file_writer.close().await.unwrap();
115+
116+
// start two transaction and commit one of them
117+
let tx1 = Transaction::new(&table);
118+
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
119+
append_action.add_data_files(data_file.clone()).unwrap();
120+
let tx1 = append_action.apply().await.unwrap();
121+
let tx2 = Transaction::new(&table);
122+
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
123+
append_action.add_data_files(data_file.clone()).unwrap();
124+
let tx2 = append_action.apply().await.unwrap();
125+
let table = tx2
126+
.commit(&fixture.rest_catalog)
127+
.await
128+
.expect("The first commit should not fail.");
129+
130+
// check result
131+
let batch_stream = table
132+
.scan()
133+
.select_all()
134+
.build()
135+
.unwrap()
136+
.to_arrow()
137+
.await
138+
.unwrap();
139+
let batches: Vec<_> = batch_stream.try_collect().await.unwrap();
140+
assert_eq!(batches.len(), 1);
141+
assert_eq!(batches[0], batch);
142+
143+
// another commit should fail
144+
assert!(tx1.commit(&fixture.rest_catalog).await.is_err());
145+
}

0 commit comments

Comments
 (0)
Please sign in to comment.