diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 845f7ac3..b6b9db4d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store"] +object-store = ["dep:object_store", "dep:tokio"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 4834ed50..500d65ac 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -90,7 +90,7 @@ pub enum Error { /// [tokio::task::JoinError] #[error(transparent)] - #[cfg(feature = "validate")] + #[cfg(any(feature = "validate", feature = "object-store"))] TokioJoin(#[from] tokio::task::JoinError), /// [std::num::TryFromIntError] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index e60f7ea1..34da8cc1 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -169,6 +169,8 @@ mod item_collection; mod json; mod ndjson; mod node; +#[cfg(feature = "object-store")] +mod resolver; mod statistics; #[cfg(feature = "validate")] mod validate; @@ -176,6 +178,8 @@ mod value; use std::fmt::Display; +#[cfg(feature = "object-store")] +pub use resolver::Resolver; pub use stac_types::{mime, Fields, Href, Link, Links, Migrate, SelfHref, Version, STAC_VERSION}; #[cfg(feature = "validate-blocking")] pub use validate::ValidateBlocking; @@ -197,7 +201,7 @@ pub use { item_collection::ItemCollection, json::{FromJson, ToJson}, ndjson::{FromNdjson, ToNdjson}, - node::Node, + node::{Container, Node}, statistics::Statistics, value::Value, }; diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 971de02a..9a3fdada 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -32,41 +32,26 @@ pub struct IntoValues { items: VecDeque, } +/// A resolver that uses object store. impl Node { /// Resolves all child and item links in this node. /// + /// This method uses [crate::Resolver] to resolve links. + /// /// # Examples /// /// ``` /// use stac::{Catalog, Node}; /// /// let mut node: Node = stac::read::("examples/catalog.json").unwrap().into(); - /// node.resolve().unwrap(); + /// # tokio_test::block_on(async { + /// let node = node.resolve().await.unwrap(); + /// }); /// ``` - pub fn resolve(&mut self) -> Result<()> { - let links = std::mem::take(self.value.links_mut()); - let href = self.value.self_href().cloned(); - for mut link in links { - if link.is_child() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - // TODO enable object store - tracing::debug!("resolving child: {}", link.href); - let child: Container = crate::read::(link.href)?.try_into()?; - self.children.push_back(child.into()); - } else if link.is_item() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - tracing::debug!("resolving item: {}", link.href); - let item = crate::read::(link.href)?; - self.items.push_back(item); - } else { - self.value.links_mut().push(link); - } - } - Ok(()) + #[cfg(feature = "object-store")] + pub async fn resolve(self) -> Result { + let resolver = crate::Resolver::default(); + resolver.resolve(self).await } /// Creates a consuming iterator over this node and its children and items. @@ -205,7 +190,7 @@ impl SelfHref for Container { #[cfg(test)] mod tests { use super::Node; - use crate::{Catalog, Collection, Links}; + use crate::{Catalog, Collection}; #[test] fn into_node() { @@ -213,12 +198,15 @@ mod tests { let _ = Node::from(Collection::new("an-id", "a description")); } - #[test] - fn resolve() { - let mut node: Node = crate::read::("examples/catalog.json") + #[tokio::test] + #[cfg(feature = "object-store")] + async fn resolve() { + use crate::Links; + + let node: Node = crate::read::("examples/catalog.json") .unwrap() .into(); - node.resolve().unwrap(); + let node = node.resolve().await.unwrap(); assert_eq!(node.children.len(), 3); assert_eq!(node.items.len(), 1); assert_eq!(node.value.links().len(), 2); diff --git a/crates/core/src/resolver.rs b/crates/core/src/resolver.rs new file mode 100644 index 00000000..b31e0efd --- /dev/null +++ b/crates/core/src/resolver.rs @@ -0,0 +1,66 @@ +use crate::{Container, Links, Node, Result, SelfHref, Value}; +use std::{future::Future, pin::Pin}; +use tokio::task::JoinSet; +use url::Url; + +/// An object that uses object store to resolve links. +#[derive(Debug, Default)] +#[cfg(feature = "object-store")] +pub struct Resolver { + recursive: bool, + use_items_endpoint: bool, +} + +impl Resolver { + /// Resolves the links of a node. + pub fn resolve(&self, mut node: Node) -> Pin> + '_>> { + Box::pin(async { + let links = std::mem::take(node.value.links_mut()); + let href = node.value.self_href().cloned(); + let mut join_set = JoinSet::new(); + for mut link in links { + if link.is_child() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set + .spawn(async move { (crate::io::get::(link.href).await, true) }); + } else if !self.use_items_endpoint && link.is_item() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) }); + } else if self.use_items_endpoint && link.rel == "items" { + let mut url: Url = link.href.try_into()?; + // TODO make this configurable + let _ = url + .query_pairs_mut() + .append_pair("limit", "1") + .append_pair("sortby", "-properties.datetime"); + let _ = join_set.spawn(async move { (crate::io::get(url).await, false) }); + } else { + node.value.links_mut().push(link); + } + } + while let Some(result) = join_set.join_next().await { + let (result, is_child) = result?; + let value = result?; + if is_child { + let child = Container::try_from(value)?.into(); + node.children.push_back(child); + } else if let Value::ItemCollection(item_collection) = value { + node.items.extend(item_collection.into_iter()); + } else { + node.items.push_back(value.try_into()?); + } + } + if self.recursive { + let children = std::mem::take(&mut node.children); + for child in children { + node.children.push_back(self.resolve(child).await?); + } + } + Ok(node) + }) + } +} diff --git a/crates/types/src/href.rs b/crates/types/src/href.rs index dec2eaa8..469027b6 100644 --- a/crates/types/src/href.rs +++ b/crates/types/src/href.rs @@ -214,10 +214,20 @@ impl From for Href { } } +impl TryFrom for Url { + type Error = Error; + fn try_from(value: Href) -> Result { + match value { + Href::Url(url) => Ok(url), + Href::String(s) => s.parse().map_err(Error::from), + } + } +} + #[cfg(feature = "reqwest")] impl From for Href { fn from(value: reqwest::Url) -> Self { - Href::Url(url::Url::from(value)) + Href::Url(value) } } @@ -241,7 +251,7 @@ fn make_absolute(href: &str, base: &str) -> String { } else { let (base, _) = base.split_at(base.rfind('/').unwrap_or(0)); if base.is_empty() { - normalize_path(&href) + normalize_path(href) } else { normalize_path(&format!("{}/{}", base, href)) }