Skip to content

Commit

Permalink
feat: use cql2 for pgstac queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Nov 4, 2024
1 parent ac57183 commit cfe03bb
Show file tree
Hide file tree
Showing 24 changed files with 155 additions and 35 deletions.
8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ bb8-postgres = "0.8.1"
bytes = "1.7"
chrono = "0.4.38"
clap = "4.5"
cql2 = "0.3.0"
duckdb = "=1.0.0"
fluent-uri = "0.3.1"
futures = "0.3.31"
Expand All @@ -60,7 +61,7 @@ mime = "0.3.17"
mockito = "1.5"
object_store = "0.11.0"
openssl = { version = "0.10.68", features = ["vendored"] }
openssl-src = "=300.3.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python
openssl-src = "=300.3.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python
parquet = { version = "52.2", default-features = false }
pgstac = { version = "0.2.1", path = "crates/pgstac" }
pyo3 = "0.22.3"
Expand Down Expand Up @@ -89,6 +90,9 @@ tokio-test = "0.4.4"
tower = "0.5.1"
tower-http = "0.6.1"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = [
"env-filter",
"tracing-log",
] }
url = "2.3"
webpki-roots = "0.26.6"
1 change: 1 addition & 0 deletions crates/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ geo = ["dep:geo", "stac/geo"]
[dependencies]
async-stream = { workspace = true, optional = true }
chrono.workspace = true
cql2.workspace = true
futures = { workspace = true, optional = true }
http = { workspace = true, optional = true }
reqwest = { workspace = true, features = ["json"], optional = true }
Expand Down
15 changes: 7 additions & 8 deletions crates/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,7 @@ impl Client {
if let Some(headers) = headers.into() {
request = request.headers(headers);
}
let response = request.send().await?;
if !response.status().is_success() {
let status_code = response.status();
let text = response.text().await.ok().unwrap_or_default();
return Err(Error::Request { status_code, text });
}
let response = request.send().await?.error_for_status()?;
response.json().await.map_err(Error::from)
}

Expand Down Expand Up @@ -361,8 +356,12 @@ fn stream_pages(

fn not_found_to_none<T>(result: Result<T>) -> Result<Option<T>> {
let mut result = result.map(Some);
if let Err(Error::Request { status_code, .. }) = result {
if status_code == StatusCode::NOT_FOUND {
if let Err(Error::Reqwest(ref err)) = result {
if err
.status()
.map(|s| s == StatusCode::NOT_FOUND)
.unwrap_or_default()
{
result = Ok(None);
}
}
Expand Down
15 changes: 4 additions & 11 deletions crates/api/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ pub enum Error {
#[error(transparent)]
ChronoParse(#[from] chrono::ParseError),

/// [cql2::Error]
#[error(transparent)]
Cql2(#[from] cql2::Error),

/// [geojson::Error]
#[error(transparent)]
GeoJson(#[from] Box<geojson::Error>),
Expand Down Expand Up @@ -75,17 +79,6 @@ pub enum Error {
#[cfg(feature = "client")]
Reqwest(#[from] reqwest::Error),

/// A search error.
#[error("HTTP status error ({status_code}): {text}")]
#[cfg(feature = "client")]
Request {
/// The status code
status_code: reqwest::StatusCode,

/// The text of the server response.
text: String,
},

/// A search has both bbox and intersects.
#[error("search has bbox and intersects")]
SearchHasBboxAndIntersects(Box<Search>),
Expand Down
21 changes: 18 additions & 3 deletions crates/api/src/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{convert::Infallible, str::FromStr};

use crate::Result;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{convert::Infallible, str::FromStr};

/// The language of the filter expression.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
Expand All @@ -16,6 +16,21 @@ pub enum Filter {
Cql2Json(Map<String, Value>),
}

impl Filter {
/// Converts this filter to cql2-json.
pub fn into_cql2_json(self) -> Result<Filter> {
match self {
Filter::Cql2Json(_) => Ok(self),
Filter::Cql2Text(text) => {
let expr = cql2::parse_text(&text)?;
Ok(Filter::Cql2Json(serde_json::from_value(
serde_json::to_value(expr)?,
)?))
}
}
}
}

impl Default for Filter {
fn default() -> Self {
Filter::Cql2Json(Default::default())
Expand All @@ -24,7 +39,7 @@ impl Default for Filter {

impl FromStr for Filter {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Ok(Filter::Cql2Text(s.to_string()))
}
}
Expand Down
8 changes: 8 additions & 0 deletions crates/api/src/items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,14 @@ impl Items {
collections: Some(vec![collection_id.to_string()]),
}
}

/// Converts the filter to cql2-json, if it is set.
pub fn into_cql2_json(mut self) -> Result<Items> {
if let Some(filter) = self.filter {
self.filter = Some(filter.into_cql2_json()?);
}
Ok(self)
}
}

impl TryFrom<Items> for GetItems {
Expand Down
6 changes: 6 additions & 0 deletions crates/api/src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ impl Search {
Ok(true)
}
}

/// Converts this search's filter to cql2-json, if set.
pub fn into_cql2_json(mut self) -> Result<Search> {
self.items = self.items.into_cql2_json()?;
Ok(self)
}
}

impl TryFrom<Search> for GetSearch {
Expand Down
6 changes: 6 additions & 0 deletions crates/cli/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub struct Args {
#[derive(Debug, clap::Subcommand, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum Subcommand {
/// Interact with a pgstac database
#[cfg(feature = "pgstac")]
Pgstac(crate::subcommand::pgstac::Args),

/// Search for STAC items
Search(search::Args),

Expand Down Expand Up @@ -99,6 +103,8 @@ impl Args {
/// Runs whatever these arguments say that we should run.
pub async fn run(self) -> Result<()> {
match &self.subcommand {
#[cfg(feature = "pgstac")]
Subcommand::Pgstac(args) => self.pgstac(args).await,
Subcommand::Search(args) => self.search(args).await,
Subcommand::Serve(args) => self.serve(args).await,
Subcommand::Translate(args) => self.translate(args).await,
Expand Down
2 changes: 2 additions & 0 deletions crates/cli/src/subcommand/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#[cfg(feature = "pgstac")]
pub mod pgstac;
pub mod search;
pub mod serve;
pub mod translate;
Expand Down
57 changes: 57 additions & 0 deletions crates/cli/src/subcommand/pgstac.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::Result;
use stac_server::PgstacBackend;

#[derive(Debug, Clone, clap::Args)]
pub struct Args {
/// The pgstac subcommand
#[command(subcommand)]
subcommand: Subcommand,
}

#[derive(clap::Subcommand, Debug, Clone)]
pub enum Subcommand {
/// Loads data into the pgstac database
Load(LoadArgs),
}

#[derive(clap::Args, Debug, Clone)]
pub struct LoadArgs {
/// The connection string.
dsn: String,

/// Hrefs to load into the database.
///
/// If not provided or `-`, data will be read from standard input.
hrefs: Vec<String>,

/// Load in all "item" links on collections.
#[arg(short, long)]
load_collection_items: bool,

/// Auto-generate collections for any collection-less items.
#[arg(short, long)]
create_collections: bool,
}

impl crate::Args {
pub async fn pgstac(&self, args: &Args) -> Result<()> {
match &args.subcommand {
Subcommand::Load(load_args) => {
let mut backend = PgstacBackend::new_from_stringlike(&load_args.dsn).await?;
let load = self
.load(
&mut backend,
load_args.hrefs.iter().map(|h| h.as_str()),
load_args.load_collection_items,
load_args.create_collections,
)
.await?;
eprintln!(
"Loaded {} collection(s) and {} item(s)",
load.collections, load.items
);
Ok(())
}
}
}
}
1 change: 1 addition & 0 deletions crates/pgstac/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ stac-api.workspace = true
thiserror.workspace = true
tokio-postgres = { workspace = true, features = ["with-serde_json-1"] }
tokio-postgres-rustls = { workspace = true, optional = true }
tracing.workspace = true
webpki-roots = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/pgstac/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ See the [documentation](https://docs.rs/pgstac) for more.
To test:

```shell
docker-compose -f pgstac/docker-compose.yml up -d
docker compose up -d
cargo test -p pgstac
docker-compose -f pgstac/docker-compose.yml down
docker compose down
```

Each test is run in its own transaction, which is rolled back after the test.
Expand Down
2 changes: 2 additions & 0 deletions crates/pgstac/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,9 @@ impl<'a, C: GenericClient> Client<'a, C> {

/// Searches for items.
pub async fn search(&self, search: Search) -> Result<Page> {
let search = search.into_cql2_json()?;
let search = serde_json::to_value(search)?;
tracing::debug!("searching: {:?}", search);
self.value("search", &[&search]).await
}

Expand Down
4 changes: 4 additions & 0 deletions crates/pgstac/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ pub enum Error {
#[error(transparent)]
SerdeJson(#[from] serde_json::Error),

/// [stac_api::Error]
#[error(transparent)]
StacApi(#[from] stac_api::Error),

/// [tokio_postgres::Error]
#[error(transparent)]
TokioPostgres(#[from] tokio_postgres::Error),
Expand Down
1 change: 1 addition & 0 deletions crates/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ stac-types.workspace = true
thiserror.workspace = true
tokio-postgres = { workspace = true, optional = true }
tower-http = { workspace = true, features = ["cors"], optional = true }
tracing.workspace = true
url.workspace = true

[dev-dependencies]
Expand Down
7 changes: 7 additions & 0 deletions crates/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ To use the [pgstac](https://github.com/stac-utils/pgstac) backend:
stac serve --pgstac postgresql://username:password@localhost:5432/postgis
```

If you'd like to serve your own **pgstac** backend with some sample items:

```shell
docker compose up -d pgstac
scripts/load-pgstac-fixtures # This might take a while, e.g. 30 seconds or so
```

### Library

To use this library in another application:
Expand Down
3 changes: 2 additions & 1 deletion crates/server/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ pub trait Backend: Clone + Sync + Send + 'static {
/// ```
fn add_item(&mut self, item: Item) -> impl Future<Output = Result<()>> + Send;

/// Adds several items.
/// Adds multiple items.
fn add_items(&mut self, items: Vec<Item>) -> impl Future<Output = Result<()>> + Send {
tracing::debug!("adding {} items using naïve loading", items.len());
async move {
for item in items {
self.add_item(item).await?;
Expand Down
14 changes: 8 additions & 6 deletions crates/server/src/backend/pgstac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ where
params: impl ToString,
tls: Tls,
) -> Result<PgstacBackend<Tls>> {
let params = params.to_string();
let connection_manager = PostgresConnectionManager::new_from_stringlike(params, tls)?;
let pool = Pool::builder().build(connection_manager).await?;
Ok(PgstacBackend { pool })
Expand Down Expand Up @@ -112,6 +113,13 @@ where
client.add_item(item).await.map_err(Error::from)
}

async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
tracing::debug!("adding {} items using pgstac loading", items.len());
let client = self.pool.get().await?;
let client = Client::new(&*client);
client.add_items(&items).await.map_err(Error::from)
}

async fn items(&self, collection_id: &str, items: Items) -> Result<Option<ItemCollection>> {
// TODO should we check for collection existence?
let search = items.search_collection(collection_id);
Expand All @@ -127,12 +135,6 @@ where
.map_err(Error::from)
}

async fn add_items(&mut self, items: Vec<Item>) -> Result<()> {
let client = self.pool.get().await?;
let client = Client::new(&*client);
client.add_items(&items).await.map_err(Error::from)
}

async fn search(&self, search: Search) -> Result<ItemCollection> {
let client = self.pool.get().await?;
let client = Client::new(&*client);
Expand Down
1 change: 1 addition & 0 deletions crates/server/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ pub async fn get_search<B: Backend>(
State(api): State<Api<B>>,
search: Query<GetSearch>,
) -> Result<GeoJson<ItemCollection>> {
tracing::debug!("GET /search: {:?}", search.0);
let search = Search::try_from(search.0)
.and_then(Search::valid)
.map_err(|error| Error::BadRequest(error.to_string()))?;
Expand Down
2 changes: 1 addition & 1 deletion crates/pgstac/docker-compose.yml → docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
services:
database:
pgstac:
container_name: stac-rs
image: ghcr.io/stac-utils/pgstac:v0.8.6
environment:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[project]
name = "stac-rs"
version = "0.0.0"
description = "This package should never be released, its just for uv."
description = "This package should never be released, it's just for uv."
requires-python = ">=3.12"
dependencies = []

Expand Down
Binary file added scripts/fixtures/1000-sentinel-2-items.parquet
Binary file not shown.
Loading

0 comments on commit cfe03bb

Please sign in to comment.