From 5f17d56eac87e054c446b1afd8cb27da8f61e237 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Tue, 13 Jan 2026 16:08:05 -0700 Subject: [PATCH 1/5] feat: search directly from pgstac --- crates/cli/Cargo.toml | 4 +- crates/cli/src/lib.rs | 64 ++++++++++++++++++++++------- crates/pgstac/Cargo.toml | 2 + crates/pgstac/src/lib.rs | 85 ++++++++++++++++++++++++++++++++++++++- crates/pgstac/src/page.rs | 25 +++++++++++- 5 files changed, 162 insertions(+), 18 deletions(-) diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 729913d3..e1097422 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -13,7 +13,7 @@ rust-version.workspace = true [features] default = [] -pgstac = ["stac-server/pgstac"] +pgstac = ["dep:pgstac", "dep:tokio-postgres", "stac-server/pgstac"] duckdb-bundled = ["stac-duckdb/bundled"] [dependencies] @@ -24,6 +24,7 @@ clap = { workspace = true, features = ["derive"] } clap_complete.workspace = true futures-core.workspace = true futures-util.workspace = true +pgstac = { version = "0.4.2", path = "../pgstac", optional = true } serde_json.workspace = true stac = { version = "0.16.0", path = "../core" } stac-duckdb = { version = "0.3.2", path = "../duckdb" } @@ -39,6 +40,7 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "fs", ] } +tokio-postgres = { workspace = true, optional = true } tracing.workspace = true tracing-indicatif.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter"] } diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index da7b1745..dc112a88 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -165,7 +165,7 @@ pub enum Command { /// Searches a STAC API or stac-geoparquet file. Search { - /// The href of the STAC API or stac-geoparquet file to search. + /// The href of the STAC API, stac-geoparquet file, or pgstac to search. href: String, /// The output file. @@ -173,13 +173,19 @@ pub enum Command { /// To write to standard output, pass `-` or don't provide an argument at all. outfile: Option, - /// Use DuckDB to query the href. + /// The search implementation to use. /// - /// By default, DuckDB will be used if the href ends in `parquet` or - /// `geoparquet`. Set this value to `true` to force DuckDB to be used, - /// or to `false` to disable this behavior. - #[arg(long = "use-duckdb")] - use_duckdb: Option, + /// If not provided, the implementation will be inferred from the href: + /// - `postgresql://` URLs will use the `postgresql` implementation + /// - `.parquet` or `.geoparquet` files will use the `duckdb` implementation + /// - All other hrefs will use the `api` implementation + /// + /// Possible values: + /// - api: Search a STAC API endpoint + /// - duckdb: Search a stac-geoparquet file using DuckDB + /// - postgresql: Search a pgstac database + #[arg(long = "search-with", verbatim_doc_comment)] + search_with: Option, /// The maximum number of items to return from the search. #[arg(short = 'n', long = "max-items")] @@ -302,6 +308,17 @@ enum Value { Json(serde_json::Value), } +/// The search implementation to use. +#[derive(Debug, Clone, Copy, clap::ValueEnum)] +pub enum SearchImplementation { + /// Search a STAC API endpoint + Api, + /// Search a stac-geoparquet file using DuckDB + Duckdb, + /// Search a pgstac database + Postgresql, +} + #[derive(Debug, Clone)] struct KeyValue(String, String); @@ -351,7 +368,7 @@ impl Rustac { Command::Search { ref href, ref outfile, - ref use_duckdb, + search_with, ref max_items, ref intersects, ref ids, @@ -363,9 +380,17 @@ impl Rustac { ref filter, ref limit, } => { - let use_duckdb = use_duckdb.unwrap_or_else(|| { - matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_))) + // Infer the search implementation from the href if not explicitly provided + let search_impl = search_with.unwrap_or_else(|| { + if href.starts_with("postgresql://") { + SearchImplementation::Postgresql + } else if matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_))) { + SearchImplementation::Duckdb + } else { + SearchImplementation::Api + } }); + let get_items = GetItems { bbox: bbox.clone(), datetime: datetime.clone(), @@ -383,10 +408,21 @@ impl Rustac { }; let search: Search = get_search.try_into()?; let search = search.normalize_datetimes()?; - let item_collection = if use_duckdb { - stac_duckdb::search(href, search, *max_items)? - } else { - stac_io::api::search(href, search, *max_items).await? + let item_collection = match search_impl { + SearchImplementation::Postgresql => { + #[cfg(feature = "pgstac")] + { + pgstac::search(href, search, *max_items).await? + } + #[cfg(not(feature = "pgstac"))] + { + return Err(anyhow!("rustac is not compiled with pgstac support")); + } + } + SearchImplementation::Duckdb => stac_duckdb::search(href, search, *max_items)?, + SearchImplementation::Api => { + stac_io::api::search(href, search, *max_items).await? + } }; self.put( outfile.as_deref(), diff --git a/crates/pgstac/Cargo.toml b/crates/pgstac/Cargo.toml index 6b6b4516..7828a002 100644 --- a/crates/pgstac/Cargo.toml +++ b/crates/pgstac/Cargo.toml @@ -16,7 +16,9 @@ serde.workspace = true serde_json.workspace = true stac = { version = "0.16.0", path = "../core" } thiserror.workspace = true +tokio = { workspace = true, features = ["rt"] } tokio-postgres = { workspace = true, features = ["with-serde_json-1"] } +tracing.workspace = true [dev-dependencies] geojson.workspace = true diff --git a/crates/pgstac/src/lib.rs b/crates/pgstac/src/lib.rs index 34c80774..9c7fd6e6 100644 --- a/crates/pgstac/src/lib.rs +++ b/crates/pgstac/src/lib.rs @@ -80,7 +80,7 @@ mod page; pub use page::Page; use serde::{Serialize, de::DeserializeOwned}; use stac::api::Search; -use tokio_postgres::{GenericClient, Row, types::ToSql}; +use tokio_postgres::{GenericClient, NoTls, Row, types::ToSql}; /// Crate-specific error enum. #[derive(Debug, thiserror::Error)] @@ -97,6 +97,10 @@ pub enum Error { /// [tokio_postgres::Error] #[error(transparent)] TokioPostgres(#[from] tokio_postgres::Error), + + /// [std::num::TryFromIntError] + #[error(transparent)] + TryFromInt(#[from] std::num::TryFromIntError), } /// Crate-specific result type. @@ -105,6 +109,85 @@ pub type Result = std::result::Result; /// A [serde_json::Value]. pub type JsonValue = serde_json::Value; +/// Searches a pgstac database. +/// +/// This function establishes a connection to the pgstac database, performs the search +/// with pagination support, and collects all results up to `max_items` if specified. +/// +/// # Examples +/// +/// ```no_run +/// # tokio_test::block_on(async { +/// let connection_string = "postgresql://username:password@localhost:5432/postgis"; +/// let search = stac::api::Search::default(); +/// let item_collection = pgstac::search(connection_string, search, None).await.unwrap(); +/// # }) +/// ``` +pub async fn search( + connection_string: &str, + mut search: Search, + max_items: Option, +) -> Result { + let (client, connection) = tokio_postgres::connect(connection_string, NoTls).await?; + let _ = tokio::spawn(async move { + if let Err(e) = connection.await { + tracing::error!("pgstac connection error: {}", e); + } + }); + + let mut all_items = if let Some(max_items) = max_items { + if max_items == 0 { + return Ok(stac::api::ItemCollection::new(Vec::new())?); + } + Vec::with_capacity(max_items) + } else { + Vec::new() + }; + + if search.items.limit.is_none() + && let Some(max_items) = max_items + { + search.items.limit = Some(max_items.try_into()?); + } + + loop { + tracing::info!("Fetching page"); + let page = client.search(search.clone()).await?; + let next_token = page.next_token(); + let has_next_token = next_token.is_some(); + if let Some(token) = next_token { + let _ = search + .additional_fields + .insert("token".into(), token.into()); + } + let num_features = page.features.len(); + for item in page.features { + all_items.push(item); + if let Some(max_items) = max_items { + if all_items.len() >= max_items { + break; + } + } + } + let should_continue = if let Some(max_items) = max_items { + all_items.len() < max_items && has_next_token + } else { + has_next_token + }; + if !should_continue { + break; + } + if let Some(limit) = search.items.limit { + if num_features < limit as usize { + break; + } + } + tracing::debug!("Found {} item(s), continuing...", all_items.len()); + } + + Ok(stac::api::ItemCollection::new(all_items)?) +} + /// Methods for working with **pgstac**. #[allow(async_fn_in_trait)] pub trait Pgstac: GenericClient { diff --git a/crates/pgstac/src/page.rs b/crates/pgstac/src/page.rs index f0d8989e..c4bf8947 100644 --- a/crates/pgstac/src/page.rs +++ b/crates/pgstac/src/page.rs @@ -44,11 +44,32 @@ pub struct Page { impl Page { /// Returns this page's next token, if it has one. pub fn next_token(&self) -> Option { - self.next.as_ref().map(|next| format!("next:{next}")) + if let Some(next) = &self.next { + return Some(format!("next:{next}")); + } + + self.links + .iter() + .find(|link| link.rel == "next") + .and_then(|link| extract_token_from_href(&link.href)) } /// Returns this page's prev token, if it has one. pub fn prev_token(&self) -> Option { - self.prev.as_ref().map(|prev| format!("prev:{prev}")) + if let Some(prev) = &self.prev { + return Some(format!("prev:{prev}")); + } + + self.links + .iter() + .find(|link| link.rel == "prev") + .and_then(|link| extract_token_from_href(&link.href)) } } + +fn extract_token_from_href(href: &str) -> Option { + href.split("token=") + .nth(1) + .and_then(|token_part| token_part.split('&').next()) + .map(|token| token.to_string()) +} From db63a509a0bc24397e084b5f88f214b799dd94ea Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Tue, 13 Jan 2026 16:09:44 -0700 Subject: [PATCH 2/5] fix: unused dependency --- crates/cli/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index e1097422..bd4623ce 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -13,7 +13,7 @@ rust-version.workspace = true [features] default = [] -pgstac = ["dep:pgstac", "dep:tokio-postgres", "stac-server/pgstac"] +pgstac = ["dep:pgstac", "stac-server/pgstac"] duckdb-bundled = ["stac-duckdb/bundled"] [dependencies] @@ -40,7 +40,6 @@ tokio = { workspace = true, features = [ "rt-multi-thread", "fs", ] } -tokio-postgres = { workspace = true, optional = true } tracing.workspace = true tracing-indicatif.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter"] } From 54b05ce4d4416e98cfe876ee502feae88c50f877 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Tue, 13 Jan 2026 17:21:07 -0700 Subject: [PATCH 3/5] fix: remove erroneous limit check --- crates/pgstac/src/lib.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/crates/pgstac/src/lib.rs b/crates/pgstac/src/lib.rs index 9c7fd6e6..0c9ae0ac 100644 --- a/crates/pgstac/src/lib.rs +++ b/crates/pgstac/src/lib.rs @@ -79,7 +79,7 @@ mod page; pub use page::Page; use serde::{Serialize, de::DeserializeOwned}; -use stac::api::Search; +use stac::api::{ItemCollection, Search}; use tokio_postgres::{GenericClient, NoTls, Row, types::ToSql}; /// Crate-specific error enum. @@ -127,7 +127,7 @@ pub async fn search( connection_string: &str, mut search: Search, max_items: Option, -) -> Result { +) -> Result { let (client, connection) = tokio_postgres::connect(connection_string, NoTls).await?; let _ = tokio::spawn(async move { if let Err(e) = connection.await { @@ -137,7 +137,7 @@ pub async fn search( let mut all_items = if let Some(max_items) = max_items { if max_items == 0 { - return Ok(stac::api::ItemCollection::new(Vec::new())?); + return Ok(ItemCollection::new(Vec::new())?); } Vec::with_capacity(max_items) } else { @@ -160,7 +160,6 @@ pub async fn search( .additional_fields .insert("token".into(), token.into()); } - let num_features = page.features.len(); for item in page.features { all_items.push(item); if let Some(max_items) = max_items { @@ -177,15 +176,10 @@ pub async fn search( if !should_continue { break; } - if let Some(limit) = search.items.limit { - if num_features < limit as usize { - break; - } - } tracing::debug!("Found {} item(s), continuing...", all_items.len()); } - Ok(stac::api::ItemCollection::new(all_items)?) + Ok(ItemCollection::new(all_items)?) } /// Methods for working with **pgstac**. From cd00ebe801dd5ff4b875ae49bf14166c0d953af9 Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Tue, 13 Jan 2026 17:25:17 -0700 Subject: [PATCH 4/5] fix: unstable if let --- crates/pgstac/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/pgstac/src/lib.rs b/crates/pgstac/src/lib.rs index 0c9ae0ac..8585b6d3 100644 --- a/crates/pgstac/src/lib.rs +++ b/crates/pgstac/src/lib.rs @@ -144,10 +144,10 @@ pub async fn search( Vec::new() }; - if search.items.limit.is_none() - && let Some(max_items) = max_items - { - search.items.limit = Some(max_items.try_into()?); + if search.items.limit.is_none() { + if let Some(max_items) = max_items { + search.items.limit = Some(max_items.try_into()?); + } } loop { From 3458246aa8b32cbd21224799f67ad0fafaf4a61e Mon Sep 17 00:00:00 2001 From: Pete Gadomski Date: Tue, 13 Jan 2026 17:26:28 -0700 Subject: [PATCH 5/5] tests: add search test to ci --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d1721d2d..e40c6d0b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,6 +104,8 @@ jobs: run: cargo test -p pgstac --all-features - name: Validate run: uv run --group stac-api-validator scripts/validate-stac-server --pgstac + - name: Search + run: cargo run -F pgstac search postgresql://username:password@localhost:5432/postgis check-nightly: name: Check (nightly) runs-on: ubuntu-latest