From 00fd6f4fa535311a084e803388212eb245a5bc29 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Fri, 1 Nov 2024 14:12:07 -0600 Subject: [PATCH 1/5] feat: add an object store resolver --- crates/core/src/error.rs | 2 +- crates/core/src/lib.rs | 2 + crates/core/src/node.rs | 115 ++++++++++++++++++++++++++++----------- crates/types/src/href.rs | 10 ++++ 4 files changed, 97 insertions(+), 32 deletions(-) diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 4834ed50..500d65ac 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -90,7 +90,7 @@ pub enum Error { /// [tokio::task::JoinError] #[error(transparent)] - #[cfg(feature = "validate")] + #[cfg(any(feature = "validate", feature = "object-store"))] TokioJoin(#[from] tokio::task::JoinError), /// [std::num::TryFromIntError] diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index e60f7ea1..0f935547 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -176,6 +176,8 @@ mod value; use std::fmt::Display; +#[cfg(feature = "object-store")] +pub use node::Resolver; pub use stac_types::{mime, Fields, Href, Link, Links, Migrate, SelfHref, Version, STAC_VERSION}; #[cfg(feature = "validate-blocking")] pub use validate::ValidateBlocking; diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 971de02a..4f73028d 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1,5 +1,7 @@ use crate::{Catalog, Collection, Error, Href, Item, Link, Links, Result, SelfHref, Value}; -use std::collections::VecDeque; +use std::{collections::VecDeque, future::Future, pin::Pin}; +use tokio::task::JoinSet; +use url::Url; /// A node in a STAC tree. #[derive(Debug)] @@ -32,41 +34,32 @@ pub struct IntoValues { items: VecDeque, } +/// An object that uses objet store to resolve links. +#[derive(Debug, Default)] +#[cfg(feature = "object-store")] +pub struct Resolver { + recursive: bool, + use_items_endpoint: bool, +} + +/// A resolver that uses object store. impl Node { /// Resolves all child and item links in this node. /// + /// This method uses the default [Resolver]. + /// /// # Examples /// /// ``` /// use stac::{Catalog, Node}; /// /// let mut node: Node = stac::read::("examples/catalog.json").unwrap().into(); - /// node.resolve().unwrap(); + /// let node = node.resolve().await.unwrap(); /// ``` - pub fn resolve(&mut self) -> Result<()> { - let links = std::mem::take(self.value.links_mut()); - let href = self.value.self_href().cloned(); - for mut link in links { - if link.is_child() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - // TODO enable object store - tracing::debug!("resolving child: {}", link.href); - let child: Container = crate::read::(link.href)?.try_into()?; - self.children.push_back(child.into()); - } else if link.is_item() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - tracing::debug!("resolving item: {}", link.href); - let item = crate::read::(link.href)?; - self.items.push_back(item); - } else { - self.value.links_mut().push(link); - } - } - Ok(()) + #[cfg(feature = "object-store")] + pub async fn resolve(self) -> Result { + let resolver = Resolver::default(); + resolver.resolve(self).await } /// Creates a consuming iterator over this node and its children and items. @@ -111,6 +104,63 @@ impl Iterator for IntoValues { } } +impl Resolver { + /// Resolves the links of a node. + pub fn resolve<'a>( + &'a self, + mut node: Node, + ) -> Pin> + 'a>> { + Box::pin(async { + let links = std::mem::take(node.value.links_mut()); + let href = node.value.self_href().cloned(); + let mut join_set = JoinSet::new(); + for mut link in links { + if link.is_child() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set + .spawn(async move { (crate::io::get::(link.href).await, true) }); + } else if !self.use_items_endpoint && link.is_item() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) }); + } else if self.use_items_endpoint && link.rel == "items" { + let mut url: Url = link.href.try_into()?; + // TODO make this configurable + let _ = url + .query_pairs_mut() + .append_pair("limit", "1") + .append_pair("sortby", "-properties.datetime"); + let _ = join_set.spawn(async move { (crate::io::get(url).await, false) }); + } else { + node.value.links_mut().push(link); + } + } + while let Some(result) = join_set.join_next().await { + let (result, is_child) = result?; + let value = result?; + if is_child { + let child = Container::try_from(value)?.into(); + node.children.push_back(child); + } else if let Value::ItemCollection(item_collection) = value { + node.items.extend(item_collection.into_iter()); + } else { + node.items.push_back(value.try_into()?); + } + } + if self.recursive { + let children = std::mem::take(&mut node.children); + for child in children { + node.children.push_back(self.resolve(child).await?); + } + } + Ok(node) + }) + } +} + impl From for Node { fn from(value: Catalog) -> Self { Container::from(value).into() @@ -205,7 +255,7 @@ impl SelfHref for Container { #[cfg(test)] mod tests { use super::Node; - use crate::{Catalog, Collection, Links}; + use crate::{Catalog, Collection}; #[test] fn into_node() { @@ -213,12 +263,15 @@ mod tests { let _ = Node::from(Collection::new("an-id", "a description")); } - #[test] - fn resolve() { - let mut node: Node = crate::read::("examples/catalog.json") + #[tokio::test] + #[cfg(feature = "object-store")] + async fn resolve() { + use crate::Links; + + let node: Node = crate::read::("examples/catalog.json") .unwrap() .into(); - node.resolve().unwrap(); + let node = node.resolve().await.unwrap(); assert_eq!(node.children.len(), 3); assert_eq!(node.items.len(), 1); assert_eq!(node.value.links().len(), 2); diff --git a/crates/types/src/href.rs b/crates/types/src/href.rs index dec2eaa8..6fcb9123 100644 --- a/crates/types/src/href.rs +++ b/crates/types/src/href.rs @@ -214,6 +214,16 @@ impl From for Href { } } +impl TryFrom for Url { + type Error = Error; + fn try_from(value: Href) -> Result { + match value { + Href::Url(url) => Ok(url), + Href::String(s) => s.parse().map_err(Error::from), + } + } +} + #[cfg(feature = "reqwest")] impl From for Href { fn from(value: reqwest::Url) -> Self { From 283653d55d8c83e3f38e033d21665eb980287a29 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Sun, 3 Nov 2024 05:57:54 -0700 Subject: [PATCH 2/5] refactor: split to own file --- crates/core/src/lib.rs | 6 ++- crates/core/src/node.rs | 73 ++----------------------------------- crates/core/src/resolver.rs | 69 +++++++++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 71 deletions(-) create mode 100644 crates/core/src/resolver.rs diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 0f935547..34da8cc1 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -169,6 +169,8 @@ mod item_collection; mod json; mod ndjson; mod node; +#[cfg(feature = "object-store")] +mod resolver; mod statistics; #[cfg(feature = "validate")] mod validate; @@ -177,7 +179,7 @@ mod value; use std::fmt::Display; #[cfg(feature = "object-store")] -pub use node::Resolver; +pub use resolver::Resolver; pub use stac_types::{mime, Fields, Href, Link, Links, Migrate, SelfHref, Version, STAC_VERSION}; #[cfg(feature = "validate-blocking")] pub use validate::ValidateBlocking; @@ -199,7 +201,7 @@ pub use { item_collection::ItemCollection, json::{FromJson, ToJson}, ndjson::{FromNdjson, ToNdjson}, - node::Node, + node::{Container, Node}, statistics::Statistics, value::Value, }; diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 4f73028d..d3c478c1 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -1,7 +1,5 @@ use crate::{Catalog, Collection, Error, Href, Item, Link, Links, Result, SelfHref, Value}; -use std::{collections::VecDeque, future::Future, pin::Pin}; -use tokio::task::JoinSet; -use url::Url; +use std::collections::VecDeque; /// A node in a STAC tree. #[derive(Debug)] @@ -34,14 +32,6 @@ pub struct IntoValues { items: VecDeque, } -/// An object that uses objet store to resolve links. -#[derive(Debug, Default)] -#[cfg(feature = "object-store")] -pub struct Resolver { - recursive: bool, - use_items_endpoint: bool, -} - /// A resolver that uses object store. impl Node { /// Resolves all child and item links in this node. @@ -54,11 +44,13 @@ impl Node { /// use stac::{Catalog, Node}; /// /// let mut node: Node = stac::read::("examples/catalog.json").unwrap().into(); + /// # tokio_test::block_on(async { /// let node = node.resolve().await.unwrap(); + /// }); /// ``` #[cfg(feature = "object-store")] pub async fn resolve(self) -> Result { - let resolver = Resolver::default(); + let resolver = crate::Resolver::default(); resolver.resolve(self).await } @@ -104,63 +96,6 @@ impl Iterator for IntoValues { } } -impl Resolver { - /// Resolves the links of a node. - pub fn resolve<'a>( - &'a self, - mut node: Node, - ) -> Pin> + 'a>> { - Box::pin(async { - let links = std::mem::take(node.value.links_mut()); - let href = node.value.self_href().cloned(); - let mut join_set = JoinSet::new(); - for mut link in links { - if link.is_child() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - let _ = join_set - .spawn(async move { (crate::io::get::(link.href).await, true) }); - } else if !self.use_items_endpoint && link.is_item() { - if let Some(href) = &href { - link.make_absolute(href)?; - } - let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) }); - } else if self.use_items_endpoint && link.rel == "items" { - let mut url: Url = link.href.try_into()?; - // TODO make this configurable - let _ = url - .query_pairs_mut() - .append_pair("limit", "1") - .append_pair("sortby", "-properties.datetime"); - let _ = join_set.spawn(async move { (crate::io::get(url).await, false) }); - } else { - node.value.links_mut().push(link); - } - } - while let Some(result) = join_set.join_next().await { - let (result, is_child) = result?; - let value = result?; - if is_child { - let child = Container::try_from(value)?.into(); - node.children.push_back(child); - } else if let Value::ItemCollection(item_collection) = value { - node.items.extend(item_collection.into_iter()); - } else { - node.items.push_back(value.try_into()?); - } - } - if self.recursive { - let children = std::mem::take(&mut node.children); - for child in children { - node.children.push_back(self.resolve(child).await?); - } - } - Ok(node) - }) - } -} - impl From for Node { fn from(value: Catalog) -> Self { Container::from(value).into() diff --git a/crates/core/src/resolver.rs b/crates/core/src/resolver.rs new file mode 100644 index 00000000..8974e8f5 --- /dev/null +++ b/crates/core/src/resolver.rs @@ -0,0 +1,69 @@ +use crate::{Container, Links, Node, Result, SelfHref, Value}; +use std::{future::Future, pin::Pin}; +use tokio::task::JoinSet; +use url::Url; + +/// An object that uses object store to resolve links. +#[derive(Debug, Default)] +#[cfg(feature = "object-store")] +pub struct Resolver { + recursive: bool, + use_items_endpoint: bool, +} + +impl Resolver { + /// Resolves the links of a node. + pub fn resolve<'a>( + &'a self, + mut node: Node, + ) -> Pin> + 'a>> { + Box::pin(async { + let links = std::mem::take(node.value.links_mut()); + let href = node.value.self_href().cloned(); + let mut join_set = JoinSet::new(); + for mut link in links { + if link.is_child() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set + .spawn(async move { (crate::io::get::(link.href).await, true) }); + } else if !self.use_items_endpoint && link.is_item() { + if let Some(href) = &href { + link.make_absolute(href)?; + } + let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) }); + } else if self.use_items_endpoint && link.rel == "items" { + let mut url: Url = link.href.try_into()?; + // TODO make this configurable + let _ = url + .query_pairs_mut() + .append_pair("limit", "1") + .append_pair("sortby", "-properties.datetime"); + let _ = join_set.spawn(async move { (crate::io::get(url).await, false) }); + } else { + node.value.links_mut().push(link); + } + } + while let Some(result) = join_set.join_next().await { + let (result, is_child) = result?; + let value = result?; + if is_child { + let child = Container::try_from(value)?.into(); + node.children.push_back(child); + } else if let Value::ItemCollection(item_collection) = value { + node.items.extend(item_collection.into_iter()); + } else { + node.items.push_back(value.try_into()?); + } + } + if self.recursive { + let children = std::mem::take(&mut node.children); + for child in children { + node.children.push_back(self.resolve(child).await?); + } + } + Ok(node) + }) + } +} From 2bddf4285eb80810de34c2c12cc7ac66115289aa Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Sun, 3 Nov 2024 05:58:27 -0700 Subject: [PATCH 3/5] fix: docs --- crates/core/src/node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index d3c478c1..9a3fdada 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -36,7 +36,7 @@ pub struct IntoValues { impl Node { /// Resolves all child and item links in this node. /// - /// This method uses the default [Resolver]. + /// This method uses [crate::Resolver] to resolve links. /// /// # Examples /// From 245ff91f2543bd4cea5c926d5e18ac7901e544f3 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Sun, 3 Nov 2024 06:03:16 -0700 Subject: [PATCH 4/5] chore: include tokio when object store --- crates/core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 845f7ac3..b6b9db4d 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -31,7 +31,7 @@ geoparquet-compression = [ "parquet/lz4", "parquet/zstd", ] -object-store = ["dep:object_store"] +object-store = ["dep:object_store", "dep:tokio"] object-store-aws = ["object-store", "object_store/aws"] object-store-azure = ["object-store", "object_store/azure"] object-store-gcp = ["object-store", "object_store/gcp"] From 8b186b680afede54f2bc95dee2f7f09da5e7843a Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Sun, 3 Nov 2024 06:16:41 -0700 Subject: [PATCH 5/5] fix: clippy --- crates/core/src/resolver.rs | 5 +---- crates/types/src/href.rs | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/core/src/resolver.rs b/crates/core/src/resolver.rs index 8974e8f5..b31e0efd 100644 --- a/crates/core/src/resolver.rs +++ b/crates/core/src/resolver.rs @@ -13,10 +13,7 @@ pub struct Resolver { impl Resolver { /// Resolves the links of a node. - pub fn resolve<'a>( - &'a self, - mut node: Node, - ) -> Pin> + 'a>> { + pub fn resolve(&self, mut node: Node) -> Pin> + '_>> { Box::pin(async { let links = std::mem::take(node.value.links_mut()); let href = node.value.self_href().cloned(); diff --git a/crates/types/src/href.rs b/crates/types/src/href.rs index 6fcb9123..469027b6 100644 --- a/crates/types/src/href.rs +++ b/crates/types/src/href.rs @@ -227,7 +227,7 @@ impl TryFrom for Url { #[cfg(feature = "reqwest")] impl From for Href { fn from(value: reqwest::Url) -> Self { - Href::Url(url::Url::from(value)) + Href::Url(value) } } @@ -251,7 +251,7 @@ fn make_absolute(href: &str, base: &str) -> String { } else { let (base, _) = base.split_at(base.rfind('/').unwrap_or(0)); if base.is_empty() { - normalize_path(&href) + normalize_path(href) } else { normalize_path(&format!("{}/{}", base, href)) }