Skip to content

Commit 4118757

Browse files
authored
feat!: write collections to stac-geoparquet metadata (#927)
Closes #428, though we'll need to propagate this change up to **rustac-py**. Breaking change, b/c we remove stuff from **geoarrow** module and move it to **geoparquet**.
1 parent 683734a commit 4118757

File tree

2 files changed

+106
-23
lines changed

2 files changed

+106
-23
lines changed

crates/core/src/geoarrow/mod.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ use geoarrow_schema::{GeoArrowType, GeometryType, Metadata};
1616
use serde_json::{Value, json};
1717
use std::{io::Cursor, sync::Arc};
1818

19-
/// The stac-geoparquet version metadata key.
20-
pub const VERSION_KEY: &str = "stac:geoparquet_version";
21-
22-
/// The stac-geoparquet version.
23-
pub const VERSION: &str = "1.0.0";
24-
2519
/// Datetime columns.
2620
pub const DATETIME_COLUMNS: [&str; 8] = [
2721
"datetime",
@@ -215,9 +209,6 @@ impl Writer {
215209
columns.push(Arc::new(proj_geometry_array));
216210
schema_builder.push(Field::new("proj:geometry", data_type, true));
217211
}
218-
let _ = schema_builder
219-
.metadata_mut()
220-
.insert(VERSION_KEY.to_string(), VERSION.into());
221212
let schema = Arc::new(schema_builder.finish());
222213
let record_batch = RecordBatch::try_new(schema, columns)?;
223214
Ok(record_batch)
@@ -400,13 +391,6 @@ mod tests {
400391
use crate::{Item, ItemCollection};
401392
use arrow_array::RecordBatchIterator;
402393

403-
#[test]
404-
fn encode() {
405-
let item: Item = crate::read("examples/simple-item.json").unwrap();
406-
let (_, schema) = super::encode(vec![item]).unwrap();
407-
assert_eq!(schema.metadata["stac:geoparquet_version"], "1.0.0");
408-
}
409-
410394
#[test]
411395
fn has_type() {
412396
let item: Item = crate::read("examples/simple-item.json").unwrap();

crates/core/src/geoparquet.rs

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,22 @@
22
33
use crate::{
44
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
5-
geoarrow::{Encoder, Options, VERSION, VERSION_KEY},
5+
geoarrow::{Encoder, Options},
66
};
77
use arrow_array::RecordBatch;
88
use bytes::Bytes;
99
use geoparquet::{
1010
reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader},
1111
writer::{GeoParquetRecordBatchEncoder, GeoParquetWriterOptionsBuilder},
1212
};
13+
pub use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
1314
use parquet::{
1415
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
1516
file::{properties::WriterProperties, reader::ChunkReader},
1617
format::KeyValue,
1718
};
18-
use std::io::Write;
19-
20-
pub use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
19+
use serde::{Deserialize, Serialize};
20+
use std::{collections::HashMap, io::Write};
2121

2222
/// Default stac-geoparquet compression
2323
pub fn default_compression() -> Compression {
@@ -27,11 +27,18 @@ pub fn default_compression() -> Compression {
2727
/// Default stac-geoparquet max row group size
2828
pub const DEFAULT_STAC_MAX_ROW_GROUP_SIZE: usize = 150_000;
2929

30+
/// The stac-geoparquet metadata key.
31+
pub const METADATA_KEY: &str = "stac-geoparquet";
32+
33+
/// The stac-geoparquet version.
34+
pub const VERSION: &str = "1.0.0";
35+
3036
/// Options for writing stac-geoparquet files.
3137
#[derive(Debug, Copy, Clone, PartialEq)]
3238
pub struct WriterOptions {
3339
/// Parquet compression codec
3440
pub compression: Option<Compression>,
41+
3542
/// Maximum number of rows in a row group
3643
pub max_row_group_size: usize,
3744
}
@@ -188,6 +195,18 @@ pub struct WriterBuilder<W: Write + Send> {
188195
pub struct Writer<W: Write + Send> {
189196
encoder: WriterEncoder,
190197
arrow_writer: ArrowWriter<W>,
198+
metadata: Metadata,
199+
}
200+
201+
/// stac-geoparquet metadata
202+
#[derive(Debug, Serialize, Deserialize)]
203+
pub struct Metadata {
204+
/// The stac-geoparquet version.
205+
pub version: String,
206+
207+
/// Any STAC collections stored alongside the items.
208+
#[serde(skip_serializing_if = "HashMap::is_empty")]
209+
pub collections: HashMap<String, Collection>,
191210
}
192211

193212
impl<W: Write + Send> WriterBuilder<W> {
@@ -355,6 +374,7 @@ impl<W: Write + Send> Writer<W> {
355374
Ok(Writer {
356375
encoder,
357376
arrow_writer,
377+
metadata: Metadata::default(),
358378
})
359379
}
360380

@@ -381,6 +401,39 @@ impl<W: Write + Send> Writer<W> {
381401
Ok(())
382402
}
383403

404+
/// Adds a collection to this writer's metadata.
405+
///
406+
/// Warns and overwrites if there's already a collection with the same id.
407+
///
408+
/// # Examples
409+
///
410+
/// ```
411+
/// use stac::{Item, Collection, geoparquet::WriterBuilder};
412+
/// use std::io::Cursor;
413+
///
414+
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
415+
/// let cursor = Cursor::new(Vec::new());
416+
/// let writer = WriterBuilder::new(cursor)
417+
/// .build(vec![item])
418+
/// .unwrap()
419+
/// .add_collection(Collection::new("an-id", "a description"))
420+
/// .unwrap();
421+
/// writer.finish().unwrap();
422+
/// ```
423+
pub fn add_collection(mut self, collection: Collection) -> Result<Writer<W>> {
424+
if let Some(previous_collection) = self
425+
.metadata
426+
.collections
427+
.insert(collection.id.clone(), collection)
428+
{
429+
log::warn!(
430+
"Collection with id={} already existed in writer, overwriting",
431+
previous_collection.id
432+
)
433+
}
434+
Ok(self)
435+
}
436+
384437
/// Finishes writing.
385438
///
386439
/// It's an error to call finish twice.
@@ -400,8 +453,8 @@ impl<W: Write + Send> Writer<W> {
400453
self.arrow_writer
401454
.append_key_value_metadata(self.encoder.into_keyvalue()?);
402455
self.arrow_writer.append_key_value_metadata(KeyValue::new(
403-
VERSION_KEY.to_string(),
404-
Some(VERSION.to_string()),
456+
METADATA_KEY.to_string(),
457+
serde_json::to_string(&self.metadata)?,
405458
));
406459
let _ = self.arrow_writer.finish()?;
407460
Ok(())
@@ -554,9 +607,21 @@ impl From<WriterOptions> for WriterProperties {
554607
}
555608
}
556609

610+
impl Default for Metadata {
611+
fn default() -> Self {
612+
Metadata {
613+
version: VERSION.to_string(),
614+
collections: HashMap::new(),
615+
}
616+
}
617+
}
618+
557619
#[cfg(test)]
558620
mod tests {
559-
use crate::{FromGeoparquet, Item, ItemCollection, SelfHref, Value, geoparquet::WriterBuilder};
621+
use crate::{
622+
Collection, FromGeoparquet, Item, ItemCollection, SelfHref, Value,
623+
geoparquet::{METADATA_KEY, Metadata, VERSION, WriterBuilder},
624+
};
560625
use bytes::Bytes;
561626
use parquet::file::reader::{FileReader, SerializedFileReader};
562627
use std::{
@@ -725,4 +790,38 @@ mod tests {
725790
let item_collection = super::from_reader(Bytes::from(writer.into_inner())).unwrap();
726791
assert!(item_collection.items[0].assets.is_empty());
727792
}
793+
794+
#[test]
795+
fn metadata() {
796+
let item: Item = crate::read("examples/simple-item.json").unwrap();
797+
let mut cursor = Cursor::new(Vec::new());
798+
WriterBuilder::new(&mut cursor)
799+
.build(vec![item])
800+
.unwrap()
801+
.add_collection(Collection::new("an-id", "a description"))
802+
.unwrap()
803+
.finish()
804+
.unwrap();
805+
let bytes = Bytes::from(cursor.into_inner());
806+
let reader = SerializedFileReader::new(bytes).unwrap();
807+
let metadata = reader
808+
.metadata()
809+
.file_metadata()
810+
.key_value_metadata()
811+
.unwrap()
812+
.iter()
813+
.find_map(|key_value| {
814+
if key_value.key == METADATA_KEY {
815+
Some(
816+
serde_json::from_str::<Metadata>(key_value.value.as_ref().unwrap())
817+
.unwrap(),
818+
)
819+
} else {
820+
None
821+
}
822+
})
823+
.unwrap();
824+
assert_eq!(metadata.version, VERSION);
825+
assert_eq!(metadata.collections["an-id"].description, "a description");
826+
}
728827
}

0 commit comments

Comments
 (0)