Skip to content

Commit e26bda3

Browse files
feat: Implement load table api. (apache#89)
* Init commit * Some comments * Done * Fix format * fix clippy * Fix fmt * sort * Use error response * Fix * Fix comments * fmt * fmt * Fix * Rename
1 parent e398b5a commit e26bda3

File tree

12 files changed

+636
-138
lines changed

12 files changed

+636
-138
lines changed

crates/catalog/rest/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ keywords = ["iceberg", "rest", "catalog"]
2828

2929
[dependencies]
3030
async-trait = "0.1"
31+
chrono = "0.4"
3132
iceberg = { path = "../../iceberg" }
3233
reqwest = { version = "^0.11", features = ["json"] }
3334
serde = { version = "^1.0", features = ["rc"] }
3435
serde_derive = "^1.0"
3536
serde_json = "^1.0"
3637
typed-builder = "^0.18"
3738
urlencoding = "2"
39+
uuid = { version = "1.5.0", features = ["v4"] }
3840

3941
[dev-dependencies]
4042
mockito = "^1"

crates/catalog/rest/src/catalog.rs

Lines changed: 222 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@ use serde::de::DeserializeOwned;
2626
use typed_builder::TypedBuilder;
2727
use urlencoding::encode;
2828

29+
use crate::catalog::_serde::LoadTableResponse;
30+
use iceberg::io::{FileIO, FileIOBuilder};
2931
use iceberg::table::Table;
3032
use iceberg::Result;
3133
use iceberg::{
3234
Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent,
3335
};
3436

3537
use self::_serde::{
36-
CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse,
37-
NamespaceSerde, RenameTableRequest, NO_CONTENT, OK,
38+
CatalogConfig, ErrorResponse, ListNamespaceResponse, ListTableResponse, NamespaceSerde,
39+
RenameTableRequest, NO_CONTENT, OK,
3840
};
3941

4042
const ICEBERG_REST_SPEC_VERSION: &str = "1.14";
@@ -195,7 +197,7 @@ impl Catalog for RestCatalog {
195197

196198
let resp = self
197199
.client
198-
.query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?)
200+
.query::<ListNamespaceResponse, ErrorResponse, OK>(request.build()?)
199201
.await?;
200202

201203
resp.namespaces
@@ -222,7 +224,7 @@ impl Catalog for RestCatalog {
222224

223225
let resp = self
224226
.client
225-
.query::<NamespaceSerde, ErrorModel, OK>(request)
227+
.query::<NamespaceSerde, ErrorResponse, OK>(request)
226228
.await?;
227229

228230
Namespace::try_from(resp)
@@ -238,7 +240,7 @@ impl Catalog for RestCatalog {
238240

239241
let resp = self
240242
.client
241-
.query::<NamespaceSerde, ErrorModel, OK>(request)
243+
.query::<NamespaceSerde, ErrorResponse, OK>(request)
242244
.await?;
243245
Namespace::try_from(resp)
244246
}
@@ -267,7 +269,7 @@ impl Catalog for RestCatalog {
267269
.build()?;
268270

269271
self.client
270-
.execute::<ErrorModel, NO_CONTENT>(request)
272+
.execute::<ErrorResponse, NO_CONTENT>(request)
271273
.await
272274
.map(|_| true)
273275
}
@@ -280,7 +282,9 @@ impl Catalog for RestCatalog {
280282
.delete(self.config.namespace_endpoint(namespace))
281283
.build()?;
282284

283-
self.client.execute::<ErrorModel, NO_CONTENT>(request).await
285+
self.client
286+
.execute::<ErrorResponse, NO_CONTENT>(request)
287+
.await
284288
}
285289

286290
/// List tables from namespace.
@@ -293,7 +297,7 @@ impl Catalog for RestCatalog {
293297

294298
let resp = self
295299
.client
296-
.query::<ListTableResponse, ErrorModel, OK>(request)
300+
.query::<ListTableResponse, ErrorResponse, OK>(request)
297301
.await?;
298302

299303
Ok(resp.identifiers)
@@ -312,11 +316,43 @@ impl Catalog for RestCatalog {
312316
}
313317

314318
/// Load table from the catalog.
315-
async fn load_table(&self, _table: &TableIdent) -> Result<Table> {
316-
Err(Error::new(
317-
ErrorKind::FeatureUnsupported,
318-
"Creating table not supported yet!",
319-
))
319+
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
320+
let request = self
321+
.client
322+
.0
323+
.get(self.config.table_endpoint(table))
324+
.build()?;
325+
326+
let resp = self
327+
.client
328+
.query::<LoadTableResponse, ErrorResponse, OK>(request)
329+
.await?;
330+
331+
let mut props = self.config.props.clone();
332+
if let Some(config) = resp.config {
333+
props.extend(config);
334+
}
335+
336+
let file_io = match self
337+
.config
338+
.warehouse
339+
.as_ref()
340+
.or_else(|| resp.metadata_location.as_ref())
341+
{
342+
Some(url) => FileIO::from_path(url)?.with_props(props).build()?,
343+
None => FileIOBuilder::new("s3").with_props(props).build()?,
344+
};
345+
346+
let table_builder = Table::builder()
347+
.identifier(table.clone())
348+
.file_io(file_io)
349+
.metadata(resp.metadata);
350+
351+
if let Some(metadata_location) = resp.metadata_location {
352+
Ok(table_builder.metadata_location(metadata_location).build())
353+
} else {
354+
Ok(table_builder.build())
355+
}
320356
}
321357

322358
/// Drop a table from the catalog.
@@ -327,7 +363,9 @@ impl Catalog for RestCatalog {
327363
.delete(self.config.table_endpoint(table))
328364
.build()?;
329365

330-
self.client.execute::<ErrorModel, NO_CONTENT>(request).await
366+
self.client
367+
.execute::<ErrorResponse, NO_CONTENT>(request)
368+
.await
331369
}
332370

333371
/// Check if a table exists in the catalog.
@@ -339,7 +377,7 @@ impl Catalog for RestCatalog {
339377
.build()?;
340378

341379
self.client
342-
.execute::<ErrorModel, NO_CONTENT>(request)
380+
.execute::<ErrorResponse, NO_CONTENT>(request)
343381
.await
344382
.map(|_| true)
345383
}
@@ -356,7 +394,9 @@ impl Catalog for RestCatalog {
356394
})
357395
.build()?;
358396

359-
self.client.execute::<ErrorModel, NO_CONTENT>(request).await
397+
self.client
398+
.execute::<ErrorResponse, NO_CONTENT>(request)
399+
.await
360400
}
361401

362402
/// Update a table to the catalog.
@@ -412,10 +452,12 @@ mod _serde {
412452

413453
use serde_derive::{Deserialize, Serialize};
414454

455+
use iceberg::spec::TableMetadata;
415456
use iceberg::{Error, ErrorKind, Namespace, TableIdent};
416457

417458
pub(super) const OK: u16 = 200u16;
418459
pub(super) const NO_CONTENT: u16 = 204u16;
460+
419461
#[derive(Clone, Debug, Serialize, Deserialize)]
420462
pub(super) struct CatalogConfig {
421463
pub(super) overrides: HashMap<String, String>,
@@ -534,11 +576,26 @@ mod _serde {
534576
pub(super) source: TableIdent,
535577
pub(super) destination: TableIdent,
536578
}
579+
580+
#[derive(Debug, Deserialize)]
581+
#[serde(rename_all = "kebab-case")]
582+
pub(super) struct LoadTableResponse {
583+
pub(super) metadata_location: Option<String>,
584+
pub(super) metadata: TableMetadata,
585+
pub(super) config: Option<HashMap<String, String>>,
586+
}
537587
}
538588

539589
#[cfg(test)]
540590
mod tests {
591+
use iceberg::spec::ManifestListLocation::ManifestListFile;
592+
use iceberg::spec::{
593+
FormatVersion, NestedField, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog,
594+
SortOrder, Summary, Type,
595+
};
541596
use mockito::{Mock, Server, ServerGuard};
597+
use std::sync::Arc;
598+
use uuid::uuid;
542599

543600
use super::*;
544601

@@ -884,4 +941,153 @@ mod tests {
884941
config_mock.assert_async().await;
885942
rename_table_mock.assert_async().await;
886943
}
944+
945+
#[tokio::test]
946+
async fn test_load_table() {
947+
let mut server = Server::new_async().await;
948+
949+
let config_mock = create_config_mock(&mut server).await;
950+
951+
let rename_table_mock = server
952+
.mock("GET", "/v1/namespaces/ns1/tables/test1")
953+
.with_status(200)
954+
.with_body_from_file(format!(
955+
"{}/testdata/{}",
956+
env!("CARGO_MANIFEST_DIR"),
957+
"load_table_response.json"
958+
))
959+
.create_async()
960+
.await;
961+
962+
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
963+
.await
964+
.unwrap();
965+
966+
let table = catalog
967+
.load_table(&TableIdent::new(
968+
NamespaceIdent::new("ns1".to_string()),
969+
"test1".to_string(),
970+
))
971+
.await
972+
.unwrap();
973+
974+
assert_eq!(
975+
&TableIdent::from_strs(vec!["ns1", "test1"]).unwrap(),
976+
table.identifier()
977+
);
978+
assert_eq!("s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", table.metadata_location().unwrap());
979+
assert_eq!(FormatVersion::V1, table.metadata().format_version());
980+
assert_eq!("s3://warehouse/database/table", table.metadata().location());
981+
assert_eq!(
982+
uuid!("b55d9dda-6561-423a-8bfc-787980ce421f"),
983+
table.metadata().uuid()
984+
);
985+
assert_eq!(1646787054459, table.metadata().last_updated_ms());
986+
assert_eq!(
987+
vec![&Arc::new(
988+
Schema::builder()
989+
.with_fields(vec![
990+
NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
991+
NestedField::optional(2, "data", Type::Primitive(PrimitiveType::String))
992+
.into(),
993+
])
994+
.build()
995+
.unwrap()
996+
)],
997+
table.metadata().schemas_iter().collect::<Vec<_>>()
998+
);
999+
assert_eq!(
1000+
&HashMap::from([
1001+
("owner".to_string(), "bryan".to_string()),
1002+
(
1003+
"write.metadata.compression-codec".to_string(),
1004+
"gzip".to_string()
1005+
)
1006+
]),
1007+
table.metadata().properties()
1008+
);
1009+
assert_eq!(vec![&Arc::new(Snapshot::builder()
1010+
.with_snapshot_id(3497810964824022504)
1011+
.with_timestamp_ms(1646787054459)
1012+
.with_manifest_list(ManifestListFile("s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro".to_string()))
1013+
.with_sequence_number(0)
1014+
.with_schema_id(0)
1015+
.with_summary(Summary {
1016+
operation: Operation::Append,
1017+
other: HashMap::from_iter([
1018+
("spark.app.id", "local-1646787004168"),
1019+
("added-data-files", "1"),
1020+
("added-records", "1"),
1021+
("added-files-size", "697"),
1022+
("changed-partition-count", "1"),
1023+
("total-records", "1"),
1024+
("total-files-size", "697"),
1025+
("total-data-files", "1"),
1026+
("total-delete-files", "0"),
1027+
("total-position-deletes", "0"),
1028+
("total-equality-deletes", "0")
1029+
].iter().map(|p|(p.0.to_string(), p.1.to_string())))
1030+
}).build().unwrap()
1031+
)], table.metadata().snapshots().collect::<Vec<_>>());
1032+
assert_eq!(
1033+
&[SnapshotLog {
1034+
timestamp_ms: 1646787054459,
1035+
snapshot_id: 3497810964824022504
1036+
}],
1037+
table.metadata().history()
1038+
);
1039+
assert_eq!(
1040+
vec![&Arc::new(SortOrder {
1041+
order_id: 0,
1042+
fields: vec![]
1043+
})],
1044+
table.metadata().sort_orders_iter().collect::<Vec<_>>()
1045+
);
1046+
1047+
config_mock.assert_async().await;
1048+
rename_table_mock.assert_async().await;
1049+
}
1050+
1051+
#[tokio::test]
1052+
async fn test_load_table_404() {
1053+
let mut server = Server::new_async().await;
1054+
1055+
let config_mock = create_config_mock(&mut server).await;
1056+
1057+
let rename_table_mock = server
1058+
.mock("GET", "/v1/namespaces/ns1/tables/test1")
1059+
.with_status(404)
1060+
.with_body(r#"
1061+
{
1062+
"error": {
1063+
"message": "Table does not exist: ns1.test1 in warehouse 8bcb0838-50fc-472d-9ddb-8feb89ef5f1e",
1064+
"type": "NoSuchNamespaceErrorException",
1065+
"code": 404
1066+
}
1067+
}
1068+
"#)
1069+
.create_async()
1070+
.await;
1071+
1072+
let catalog = RestCatalog::new(RestCatalogConfig::builder().uri(server.url()).build())
1073+
.await
1074+
.unwrap();
1075+
1076+
let table = catalog
1077+
.load_table(&TableIdent::new(
1078+
NamespaceIdent::new("ns1".to_string()),
1079+
"test1".to_string(),
1080+
))
1081+
.await;
1082+
1083+
assert!(table.is_err());
1084+
assert!(table
1085+
.err()
1086+
.unwrap()
1087+
.message()
1088+
.contains("Table does not exist"));
1089+
1090+
config_mock.assert_async().await;
1091+
rename_table_mock.assert_async().await;
1092+
}
8871093
}

0 commit comments

Comments
 (0)