Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions crates/core/src/geoarrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ use geoarrow_schema::{GeoArrowType, GeometryType, Metadata};
use serde_json::{Value, json};
use std::{io::Cursor, sync::Arc};

/// The stac-geoparquet version metadata key.
pub const VERSION_KEY: &str = "stac:geoparquet_version";

/// The stac-geoparquet version.
pub const VERSION: &str = "1.0.0";

/// Datetime columns.
pub const DATETIME_COLUMNS: [&str; 8] = [
"datetime",
Expand Down Expand Up @@ -215,9 +209,6 @@ impl Writer {
columns.push(Arc::new(proj_geometry_array));
schema_builder.push(Field::new("proj:geometry", data_type, true));
}
let _ = schema_builder
.metadata_mut()
.insert(VERSION_KEY.to_string(), VERSION.into());
let schema = Arc::new(schema_builder.finish());
let record_batch = RecordBatch::try_new(schema, columns)?;
Ok(record_batch)
Expand Down Expand Up @@ -400,13 +391,6 @@ mod tests {
use crate::{Item, ItemCollection};
use arrow_array::RecordBatchIterator;

#[test]
fn encode() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let (_, schema) = super::encode(vec![item]).unwrap();
assert_eq!(schema.metadata["stac:geoparquet_version"], "1.0.0");
}

#[test]
fn has_type() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
Expand Down
113 changes: 106 additions & 7 deletions crates/core/src/geoparquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

use crate::{
Catalog, Collection, Error, Item, ItemCollection, Result, Value,
geoarrow::{Encoder, Options, VERSION, VERSION_KEY},
geoarrow::{Encoder, Options},
};
use arrow_array::RecordBatch;
use bytes::Bytes;
use geoparquet::{
reader::{GeoParquetReaderBuilder, GeoParquetRecordBatchReader},
writer::{GeoParquetRecordBatchEncoder, GeoParquetWriterOptionsBuilder},
};
pub use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use parquet::{
arrow::{ArrowWriter, arrow_reader::ParquetRecordBatchReaderBuilder},
file::{properties::WriterProperties, reader::ChunkReader},
format::KeyValue,
};
use std::io::Write;

pub use parquet::basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, io::Write};

/// Default stac-geoparquet compression
pub fn default_compression() -> Compression {
Expand All @@ -27,11 +27,18 @@ pub fn default_compression() -> Compression {
/// Default stac-geoparquet max row group size
pub const DEFAULT_STAC_MAX_ROW_GROUP_SIZE: usize = 150_000;

/// The stac-geoparquet metadata key.
pub const METADATA_KEY: &str = "stac-geoparquet";

/// The stac-geoparquet version.
pub const VERSION: &str = "1.0.0";

/// Options for writing stac-geoparquet files.
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct WriterOptions {
/// Parquet compression codec
pub compression: Option<Compression>,

/// Maximum number of rows in a row group
pub max_row_group_size: usize,
}
Expand Down Expand Up @@ -188,6 +195,18 @@ pub struct WriterBuilder<W: Write + Send> {
pub struct Writer<W: Write + Send> {
encoder: WriterEncoder,
arrow_writer: ArrowWriter<W>,
metadata: Metadata,
}

/// stac-geoparquet metadata
#[derive(Debug, Serialize, Deserialize)]
pub struct Metadata {
/// The stac-geoparquet version.
pub version: String,

/// Any STAC collections stored alongside the items.
#[serde(skip_serializing_if = "HashMap::is_empty")]
pub collections: HashMap<String, Collection>,
}

impl<W: Write + Send> WriterBuilder<W> {
Expand Down Expand Up @@ -355,6 +374,7 @@ impl<W: Write + Send> Writer<W> {
Ok(Writer {
encoder,
arrow_writer,
metadata: Metadata::default(),
})
}

Expand All @@ -381,6 +401,39 @@ impl<W: Write + Send> Writer<W> {
Ok(())
}

/// Adds a collection to this writer's metadata.
///
/// Warns and overwrites if there's already a collection with the same id.
///
/// # Examples
///
/// ```
/// use stac::{Item, Collection, geoparquet::WriterBuilder};
/// use std::io::Cursor;
///
/// let item: Item = stac::read("examples/simple-item.json").unwrap();
/// let cursor = Cursor::new(Vec::new());
/// let writer = WriterBuilder::new(cursor)
/// .build(vec![item])
/// .unwrap()
/// .add_collection(Collection::new("an-id", "a description"))
/// .unwrap();
/// writer.finish().unwrap();
/// ```
pub fn add_collection(mut self, collection: Collection) -> Result<Writer<W>> {
if let Some(previous_collection) = self
.metadata
.collections
.insert(collection.id.clone(), collection)
{
log::warn!(
"Collection with id={} already existed in writer, overwriting",
previous_collection.id
)
}
Ok(self)
}

/// Finishes writing.
///
/// It's an error to call finish twice.
Expand All @@ -400,8 +453,8 @@ impl<W: Write + Send> Writer<W> {
self.arrow_writer
.append_key_value_metadata(self.encoder.into_keyvalue()?);
self.arrow_writer.append_key_value_metadata(KeyValue::new(
VERSION_KEY.to_string(),
Some(VERSION.to_string()),
METADATA_KEY.to_string(),
serde_json::to_string(&self.metadata)?,
));
let _ = self.arrow_writer.finish()?;
Ok(())
Expand Down Expand Up @@ -554,9 +607,21 @@ impl From<WriterOptions> for WriterProperties {
}
}

impl Default for Metadata {
fn default() -> Self {
Metadata {
version: VERSION.to_string(),
collections: HashMap::new(),
}
}
}

#[cfg(test)]
mod tests {
use crate::{FromGeoparquet, Item, ItemCollection, SelfHref, Value, geoparquet::WriterBuilder};
use crate::{
Collection, FromGeoparquet, Item, ItemCollection, SelfHref, Value,
geoparquet::{METADATA_KEY, Metadata, VERSION, WriterBuilder},
};
use bytes::Bytes;
use parquet::file::reader::{FileReader, SerializedFileReader};
use std::{
Expand Down Expand Up @@ -725,4 +790,38 @@ mod tests {
let item_collection = super::from_reader(Bytes::from(writer.into_inner())).unwrap();
assert!(item_collection.items[0].assets.is_empty());
}

#[test]
fn metadata() {
let item: Item = crate::read("examples/simple-item.json").unwrap();
let mut cursor = Cursor::new(Vec::new());
WriterBuilder::new(&mut cursor)
.build(vec![item])
.unwrap()
.add_collection(Collection::new("an-id", "a description"))
.unwrap()
.finish()
.unwrap();
let bytes = Bytes::from(cursor.into_inner());
let reader = SerializedFileReader::new(bytes).unwrap();
let metadata = reader
.metadata()
.file_metadata()
.key_value_metadata()
.unwrap()
.iter()
.find_map(|key_value| {
if key_value.key == METADATA_KEY {
Some(
serde_json::from_str::<Metadata>(key_value.value.as_ref().unwrap())
.unwrap(),
)
} else {
None
}
})
.unwrap();
assert_eq!(metadata.version, VERSION);
assert_eq!(metadata.collections["an-id"].description, "a description");
}
}