Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ jobs:
run: cargo test -p pgstac --all-features
- name: Validate
run: uv run --group stac-api-validator scripts/validate-stac-server --pgstac
- name: Search
run: cargo run -F pgstac search postgresql://username:password@localhost:5432/postgis
check-nightly:
name: Check (nightly)
runs-on: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ rust-version.workspace = true

[features]
default = []
pgstac = ["stac-server/pgstac"]
pgstac = ["dep:pgstac", "stac-server/pgstac"]
duckdb-bundled = ["stac-duckdb/bundled"]

[dependencies]
Expand All @@ -24,6 +24,7 @@ clap = { workspace = true, features = ["derive"] }
clap_complete.workspace = true
futures-core.workspace = true
futures-util.workspace = true
pgstac = { version = "0.4.2", path = "../pgstac", optional = true }
serde_json.workspace = true
stac = { version = "0.16.0", path = "../core" }
stac-duckdb = { version = "0.3.2", path = "../duckdb" }
Expand Down
64 changes: 50 additions & 14 deletions crates/cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,21 +165,27 @@ pub enum Command {

/// Searches a STAC API or stac-geoparquet file.
Search {
/// The href of the STAC API or stac-geoparquet file to search.
/// The href of the STAC API, stac-geoparquet file, or pgstac to search.
href: String,

/// The output file.
///
/// To write to standard output, pass `-` or don't provide an argument at all.
outfile: Option<String>,

/// Use DuckDB to query the href.
/// The search implementation to use.
///
/// By default, DuckDB will be used if the href ends in `parquet` or
/// `geoparquet`. Set this value to `true` to force DuckDB to be used,
/// or to `false` to disable this behavior.
#[arg(long = "use-duckdb")]
use_duckdb: Option<bool>,
/// If not provided, the implementation will be inferred from the href:
/// - `postgresql://` URLs will use the `postgresql` implementation
/// - `.parquet` or `.geoparquet` files will use the `duckdb` implementation
/// - All other hrefs will use the `api` implementation
///
/// Possible values:
/// - api: Search a STAC API endpoint
/// - duckdb: Search a stac-geoparquet file using DuckDB
/// - postgresql: Search a pgstac database
#[arg(long = "search-with", verbatim_doc_comment)]
search_with: Option<SearchImplementation>,

/// The maximum number of items to return from the search.
#[arg(short = 'n', long = "max-items")]
Expand Down Expand Up @@ -302,6 +308,17 @@ enum Value {
Json(serde_json::Value),
}

/// The search implementation to use.
#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum SearchImplementation {
/// Search a STAC API endpoint
Api,
/// Search a stac-geoparquet file using DuckDB
Duckdb,
/// Search a pgstac database
Postgresql,
}

#[derive(Debug, Clone)]
struct KeyValue(String, String);

Expand Down Expand Up @@ -351,7 +368,7 @@ impl Rustac {
Command::Search {
ref href,
ref outfile,
ref use_duckdb,
search_with,
ref max_items,
ref intersects,
ref ids,
Expand All @@ -363,9 +380,17 @@ impl Rustac {
ref filter,
ref limit,
} => {
let use_duckdb = use_duckdb.unwrap_or_else(|| {
matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_)))
// Infer the search implementation from the href if not explicitly provided
let search_impl = search_with.unwrap_or_else(|| {
if href.starts_with("postgresql://") {
SearchImplementation::Postgresql
} else if matches!(Format::infer_from_href(href), Some(Format::Geoparquet(_))) {
SearchImplementation::Duckdb
} else {
SearchImplementation::Api
}
});

let get_items = GetItems {
bbox: bbox.clone(),
datetime: datetime.clone(),
Expand All @@ -383,10 +408,21 @@ impl Rustac {
};
let search: Search = get_search.try_into()?;
let search = search.normalize_datetimes()?;
let item_collection = if use_duckdb {
stac_duckdb::search(href, search, *max_items)?
} else {
stac_io::api::search(href, search, *max_items).await?
let item_collection = match search_impl {
SearchImplementation::Postgresql => {
#[cfg(feature = "pgstac")]
{
pgstac::search(href, search, *max_items).await?
}
#[cfg(not(feature = "pgstac"))]
{
return Err(anyhow!("rustac is not compiled with pgstac support"));
}
}
SearchImplementation::Duckdb => stac_duckdb::search(href, search, *max_items)?,
SearchImplementation::Api => {
stac_io::api::search(href, search, *max_items).await?
}
};
self.put(
outfile.as_deref(),
Expand Down
2 changes: 2 additions & 0 deletions crates/pgstac/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ serde.workspace = true
serde_json.workspace = true
stac = { version = "0.16.0", path = "../core" }
thiserror.workspace = true
tokio = { workspace = true, features = ["rt"] }
tokio-postgres = { workspace = true, features = ["with-serde_json-1"] }
tracing.workspace = true

[dev-dependencies]
geojson.workspace = true
Expand Down
81 changes: 79 additions & 2 deletions crates/pgstac/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ mod page;

pub use page::Page;
use serde::{Serialize, de::DeserializeOwned};
use stac::api::Search;
use tokio_postgres::{GenericClient, Row, types::ToSql};
use stac::api::{ItemCollection, Search};
use tokio_postgres::{GenericClient, NoTls, Row, types::ToSql};

/// Crate-specific error enum.
#[derive(Debug, thiserror::Error)]
Expand All @@ -97,6 +97,10 @@ pub enum Error {
/// [tokio_postgres::Error]
#[error(transparent)]
TokioPostgres(#[from] tokio_postgres::Error),

/// [std::num::TryFromIntError]
#[error(transparent)]
TryFromInt(#[from] std::num::TryFromIntError),
}

/// Crate-specific result type.
Expand All @@ -105,6 +109,79 @@ pub type Result<T> = std::result::Result<T, Error>;
/// A [serde_json::Value].
pub type JsonValue = serde_json::Value;

/// Searches a pgstac database.
///
/// This function establishes a connection to the pgstac database, performs the search
/// with pagination support, and collects all results up to `max_items` if specified.
///
/// # Examples
///
/// ```no_run
/// # tokio_test::block_on(async {
/// let connection_string = "postgresql://username:password@localhost:5432/postgis";
/// let search = stac::api::Search::default();
/// let item_collection = pgstac::search(connection_string, search, None).await.unwrap();
/// # })
/// ```
pub async fn search(
connection_string: &str,
mut search: Search,
max_items: Option<usize>,
) -> Result<ItemCollection> {
let (client, connection) = tokio_postgres::connect(connection_string, NoTls).await?;
let _ = tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::error!("pgstac connection error: {}", e);
}
});

let mut all_items = if let Some(max_items) = max_items {
if max_items == 0 {
return Ok(ItemCollection::new(Vec::new())?);
}
Vec::with_capacity(max_items)
} else {
Vec::new()
};

if search.items.limit.is_none() {
if let Some(max_items) = max_items {
search.items.limit = Some(max_items.try_into()?);
}
}

loop {
tracing::info!("Fetching page");
let page = client.search(search.clone()).await?;
let next_token = page.next_token();
let has_next_token = next_token.is_some();
if let Some(token) = next_token {
let _ = search
.additional_fields
.insert("token".into(), token.into());
}
for item in page.features {
all_items.push(item);
if let Some(max_items) = max_items {
if all_items.len() >= max_items {
break;
}
}
}
let should_continue = if let Some(max_items) = max_items {
all_items.len() < max_items && has_next_token
} else {
has_next_token
};
if !should_continue {
break;
}
tracing::debug!("Found {} item(s), continuing...", all_items.len());
}

Ok(ItemCollection::new(all_items)?)
}

/// Methods for working with **pgstac**.
#[allow(async_fn_in_trait)]
pub trait Pgstac: GenericClient {
Expand Down
25 changes: 23 additions & 2 deletions crates/pgstac/src/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,32 @@ pub struct Page {
impl Page {
/// Returns this page's next token, if it has one.
pub fn next_token(&self) -> Option<String> {
self.next.as_ref().map(|next| format!("next:{next}"))
if let Some(next) = &self.next {
return Some(format!("next:{next}"));
}

self.links
.iter()
.find(|link| link.rel == "next")
.and_then(|link| extract_token_from_href(&link.href))
}

/// Returns this page's prev token, if it has one.
pub fn prev_token(&self) -> Option<String> {
self.prev.as_ref().map(|prev| format!("prev:{prev}"))
if let Some(prev) = &self.prev {
return Some(format!("prev:{prev}"));
}

self.links
.iter()
.find(|link| link.rel == "prev")
.and_then(|link| extract_token_from_href(&link.href))
}
}

fn extract_token_from_href(href: &str) -> Option<String> {
href.split("token=")
.nth(1)
.and_then(|token_part| token_part.split('&').next())
.map(|token| token.to_string())
}