Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create an object store resolver #510

Merged
merged 6 commits into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ geoparquet-compression = [
"parquet/lz4",
"parquet/zstd",
]
object-store = ["dep:object_store"]
object-store = ["dep:object_store", "dep:tokio"]
object-store-aws = ["object-store", "object_store/aws"]
object-store-azure = ["object-store", "object_store/azure"]
object-store-gcp = ["object-store", "object_store/gcp"]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub enum Error {

/// [tokio::task::JoinError]
#[error(transparent)]
#[cfg(feature = "validate")]
#[cfg(any(feature = "validate", feature = "object-store"))]
TokioJoin(#[from] tokio::task::JoinError),

/// [std::num::TryFromIntError]
Expand Down
6 changes: 5 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,17 @@ mod item_collection;
mod json;
mod ndjson;
mod node;
#[cfg(feature = "object-store")]
mod resolver;
mod statistics;
#[cfg(feature = "validate")]
mod validate;
mod value;

use std::fmt::Display;

#[cfg(feature = "object-store")]
pub use resolver::Resolver;
pub use stac_types::{mime, Fields, Href, Link, Links, Migrate, SelfHref, Version, STAC_VERSION};
#[cfg(feature = "validate-blocking")]
pub use validate::ValidateBlocking;
Expand All @@ -197,7 +201,7 @@ pub use {
item_collection::ItemCollection,
json::{FromJson, ToJson},
ndjson::{FromNdjson, ToNdjson},
node::Node,
node::{Container, Node},
statistics::Statistics,
value::Value,
};
Expand Down
48 changes: 18 additions & 30 deletions crates/core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,41 +32,26 @@ pub struct IntoValues {
items: VecDeque<Item>,
}

/// A resolver that uses object store.
impl Node {
/// Resolves all child and item links in this node.
///
/// This method uses [crate::Resolver] to resolve links.
///
/// # Examples
///
/// ```
/// use stac::{Catalog, Node};
///
/// let mut node: Node = stac::read::<Catalog>("examples/catalog.json").unwrap().into();
/// node.resolve().unwrap();
/// # tokio_test::block_on(async {
/// let node = node.resolve().await.unwrap();
/// });
/// ```
pub fn resolve(&mut self) -> Result<()> {
let links = std::mem::take(self.value.links_mut());
let href = self.value.self_href().cloned();
for mut link in links {
if link.is_child() {
if let Some(href) = &href {
link.make_absolute(href)?;
}
// TODO enable object store
tracing::debug!("resolving child: {}", link.href);
let child: Container = crate::read::<Value>(link.href)?.try_into()?;
self.children.push_back(child.into());
} else if link.is_item() {
if let Some(href) = &href {
link.make_absolute(href)?;
}
tracing::debug!("resolving item: {}", link.href);
let item = crate::read::<Item>(link.href)?;
self.items.push_back(item);
} else {
self.value.links_mut().push(link);
}
}
Ok(())
#[cfg(feature = "object-store")]
pub async fn resolve(self) -> Result<Node> {
let resolver = crate::Resolver::default();
resolver.resolve(self).await
}

/// Creates a consuming iterator over this node and its children and items.
Expand Down Expand Up @@ -205,20 +190,23 @@ impl SelfHref for Container {
#[cfg(test)]
mod tests {
use super::Node;
use crate::{Catalog, Collection, Links};
use crate::{Catalog, Collection};

#[test]
fn into_node() {
let _ = Node::from(Catalog::new("an-id", "a description"));
let _ = Node::from(Collection::new("an-id", "a description"));
}

#[test]
fn resolve() {
let mut node: Node = crate::read::<Catalog>("examples/catalog.json")
#[tokio::test]
#[cfg(feature = "object-store")]
async fn resolve() {
use crate::Links;

let node: Node = crate::read::<Catalog>("examples/catalog.json")
.unwrap()
.into();
node.resolve().unwrap();
let node = node.resolve().await.unwrap();
assert_eq!(node.children.len(), 3);
assert_eq!(node.items.len(), 1);
assert_eq!(node.value.links().len(), 2);
Expand Down
66 changes: 66 additions & 0 deletions crates/core/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::{Container, Links, Node, Result, SelfHref, Value};
use std::{future::Future, pin::Pin};
use tokio::task::JoinSet;
use url::Url;

/// An object that uses object store to resolve links.
#[derive(Debug, Default)]
#[cfg(feature = "object-store")]
pub struct Resolver {
recursive: bool,
use_items_endpoint: bool,
}

impl Resolver {
/// Resolves the links of a node.
pub fn resolve(&self, mut node: Node) -> Pin<Box<impl Future<Output = Result<Node>> + '_>> {
Box::pin(async {
let links = std::mem::take(node.value.links_mut());
let href = node.value.self_href().cloned();
let mut join_set = JoinSet::new();
for mut link in links {
if link.is_child() {
if let Some(href) = &href {
link.make_absolute(href)?;
}
let _ = join_set
.spawn(async move { (crate::io::get::<Value>(link.href).await, true) });
} else if !self.use_items_endpoint && link.is_item() {
if let Some(href) = &href {
link.make_absolute(href)?;
}
let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) });
} else if self.use_items_endpoint && link.rel == "items" {
let mut url: Url = link.href.try_into()?;
// TODO make this configurable
let _ = url
.query_pairs_mut()
.append_pair("limit", "1")
.append_pair("sortby", "-properties.datetime");
let _ = join_set.spawn(async move { (crate::io::get(url).await, false) });
} else {
node.value.links_mut().push(link);
}
}
while let Some(result) = join_set.join_next().await {
let (result, is_child) = result?;
let value = result?;
if is_child {
let child = Container::try_from(value)?.into();
node.children.push_back(child);
} else if let Value::ItemCollection(item_collection) = value {
node.items.extend(item_collection.into_iter());
} else {
node.items.push_back(value.try_into()?);
}
}
if self.recursive {
let children = std::mem::take(&mut node.children);
for child in children {
node.children.push_back(self.resolve(child).await?);
}
}
Ok(node)
})
}
}
14 changes: 12 additions & 2 deletions crates/types/src/href.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,20 @@ impl From<PathBuf> for Href {
}
}

impl TryFrom<Href> for Url {
type Error = Error;
fn try_from(value: Href) -> Result<Self> {
match value {
Href::Url(url) => Ok(url),
Href::String(s) => s.parse().map_err(Error::from),
}
}
}

#[cfg(feature = "reqwest")]
impl From<reqwest::Url> for Href {
fn from(value: reqwest::Url) -> Self {
Href::Url(url::Url::from(value))
Href::Url(value)
}
}

Expand All @@ -241,7 +251,7 @@ fn make_absolute(href: &str, base: &str) -> String {
} else {
let (base, _) = base.split_at(base.rfind('/').unwrap_or(0));
if base.is_empty() {
normalize_path(&href)
normalize_path(href)
} else {
normalize_path(&format!("{}/{}", base, href))
}
Expand Down
Loading