diff --git a/crates/agent/src/api/authorize_user_collection.rs b/crates/agent/src/api/authorize_user_collection.rs index 2061cefae31..d2ecb44a6b5 100644 --- a/crates/agent/src/api/authorize_user_collection.rs +++ b/crates/agent/src/api/authorize_user_collection.rs @@ -21,10 +21,19 @@ pub async fn authorize_user_collection( }): axum::Extension, super::Request(Request { collection, + capability, started_unix, }): super::Request, ) -> Result, crate::api::ApiError> { - do_authorize_user_collection(&app.snapshot, user_id, email, collection, started_unix).await + do_authorize_user_collection( + &app.snapshot, + user_id, + email, + collection, + capability, + started_unix, + ) + .await } pub async fn do_authorize_user_collection( @@ -32,6 +41,7 @@ pub async fn do_authorize_user_collection( user_id: uuid::Uuid, email: Option, collection: models::Collection, + capability: models::Capability, started_unix: u64, ) -> Result, crate::api::ApiError> { let (has_started, started) = if started_unix == 0 { @@ -45,7 +55,7 @@ pub async fn do_authorize_user_collection( loop { match Snapshot::evaluate(snapshot, started, |snapshot: &Snapshot| { - evaluate_authorization(snapshot, user_id, email.as_ref(), &collection) + evaluate_authorization(snapshot, user_id, email.as_ref(), &collection, capability) }) { Ok((exp, (encoding_key, mut claims, broker_address, journal_name_prefix))) => { claims.inner.iat = started.timestamp() as u64; @@ -81,6 +91,7 @@ fn evaluate_authorization( user_id: uuid::Uuid, user_email: Option<&String>, collection_name: &models::Collection, + capability: models::Capability, ) -> Result< ( Option>, @@ -93,10 +104,10 @@ fn evaluate_authorization( &snapshot.user_grants, user_id, collection_name, - models::Capability::Read, + capability, ) { return Err(anyhow::anyhow!( - "{} is not authorized to {collection_name}", + "{} is not authorized to {collection_name} for {capability:?}", user_email.map(String::as_str).unwrap_or("user") ) .with_status(StatusCode::FORBIDDEN)); @@ -125,7 +136,7 @@ fn evaluate_authorization( let claims = super::DataClaims { inner: proto_gazette::Claims { - cap: proto_gazette::capability::LIST | proto_gazette::capability::READ, + cap: super::map_capability_to_gazette(capability), exp: 0, // Filled later. iat: 0, // Filled later. iss: data_plane.data_plane_fqdn.clone(), @@ -166,6 +177,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Collection::new("bobCo/anvils/peaches"), + models::Capability::Write, ) .await; @@ -175,7 +187,7 @@ mod tests { "broker.2", "bobCo/anvils/peaches/1122334455667788/", { - "cap": 10, + "cap": 26, "exp": 0, "iat": 0, "iss": "fqdn2", @@ -207,6 +219,27 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Collection::new("acmeCo/other/thing"), + models::Capability::Read, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 403, + "error": "bob@bob is not authorized to acmeCo/other/thing for Read" + } + } + "###); + } + + #[tokio::test] + async fn test_capability_to_high() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Collection::new("bobCo/anvils/peaches"), + models::Capability::Admin, ) .await; @@ -214,7 +247,7 @@ mod tests { { "Err": { "status": 403, - "error": "bob@bob is not authorized to acmeCo/other/thing" + "error": "bob@bob is not authorized to bobCo/anvils/peaches for Admin" } } "###); @@ -226,6 +259,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Collection::new("bobCo/widgets/not/found"), + models::Capability::Read, ) .await; @@ -245,6 +279,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Collection::new("bobCo/widgets/squashes"), + models::Capability::Read, ) .await; @@ -262,6 +297,7 @@ mod tests { user_id: uuid::Uuid, email: Option, collection: models::Collection, + capability: models::Capability, ) -> Result<(String, String, proto_gazette::Claims), crate::api::ApiError> { let taken = chrono::Utc::now(); let snapshot = Snapshot::build_fixture(Some(taken)); @@ -277,6 +313,7 @@ mod tests { user_id, email, collection, + capability, taken.timestamp() as u64 - 1, ) .await? diff --git a/crates/agent/src/api/authorize_user_prefix.rs b/crates/agent/src/api/authorize_user_prefix.rs new file mode 100644 index 00000000000..7e6b6a4307d --- /dev/null +++ b/crates/agent/src/api/authorize_user_prefix.rs @@ -0,0 +1,520 @@ +use super::{App, Snapshot}; +use crate::api::error::ApiErrorExt; +use anyhow::Context; +use axum::http::StatusCode; +use std::sync::Arc; + +type Request = models::authorizations::UserPrefixAuthorizationRequest; +type Response = models::authorizations::UserPrefixAuthorization; + +#[axum::debug_handler] +#[tracing::instrument( + skip(app), + err(level = tracing::Level::WARN), +)] +pub async fn authorize_user_prefix( + axum::extract::State(app): axum::extract::State>, + axum::Extension(super::ControlClaims { + sub: user_id, + email, + .. + }): axum::Extension, + super::Request(Request { + prefix, + data_plane, + capability, + started_unix, + }): super::Request, +) -> Result, crate::api::ApiError> { + do_authorize_user_prefix( + &app.snapshot, + user_id, + email, + prefix, + data_plane, + capability, + started_unix, + ) + .await +} + +pub async fn do_authorize_user_prefix( + snapshot: &std::sync::RwLock, + user_id: uuid::Uuid, + email: Option, + prefix: models::Prefix, + data_plane: models::Name, + capability: models::Capability, + started_unix: u64, +) -> Result, crate::api::ApiError> { + let started = chrono::DateTime::from_timestamp(started_unix as i64, 0).unwrap_or_default(); + + match Snapshot::evaluate(snapshot, started, |snapshot: &Snapshot| { + evaluate_authorization( + snapshot, + user_id, + email.as_ref(), + &prefix, + &data_plane, + capability, + ) + }) { + Ok(( + exp, + (encoding_key, mut broker_claims, broker_address, mut reactor_claims, reactor_address), + )) => { + broker_claims.inner.exp = exp.timestamp() as u64; + broker_claims.inner.iat = started.timestamp() as u64; + reactor_claims.inner.exp = exp.timestamp() as u64; + reactor_claims.inner.iat = started.timestamp() as u64; + + let header = jsonwebtoken::Header::default(); + let broker_token = jsonwebtoken::encode(&header, &broker_claims, &encoding_key) + .context("failed to encode authorized JWT")?; + let reactor_token = jsonwebtoken::encode(&header, &reactor_claims, &encoding_key) + .context("failed to encode authorized JWT")?; + + Ok(axum::Json(Response { + broker_token, + broker_address, + reactor_token, + reactor_address, + retry_millis: 0, + })) + } + Err(Ok(backoff)) => Ok(axum::Json(Response { + retry_millis: backoff.as_millis() as u64, + ..Default::default() + })), + Err(Err(err)) => Err(err), + } +} + +fn evaluate_authorization( + snapshot: &Snapshot, + user_id: uuid::Uuid, + user_email: Option<&String>, + prefix: &models::Prefix, + data_plane_name: &models::Name, + capability: models::Capability, +) -> Result< + ( + Option>, + ( + jsonwebtoken::EncodingKey, + super::DataClaims, // Broker claims. + String, // Broker address. + super::DataClaims, // Reactor claims. + String, // Reactor address. + ), + ), + crate::api::ApiError, +> { + if !tables::UserGrant::is_authorized( + &snapshot.role_grants, + &snapshot.user_grants, + user_id, + prefix, + capability, + ) { + return Err(anyhow::anyhow!( + "{} is not authorized to {prefix} for {capability:?}", + user_email.map(String::as_str).unwrap_or("user") + ) + .with_status(StatusCode::FORBIDDEN)); + } + + if !tables::UserGrant::is_authorized( + &snapshot.role_grants, + &snapshot.user_grants, + user_id, + &data_plane_name, + models::Capability::Read, + ) { + return Err(anyhow::anyhow!( + "{} is not authorized to {data_plane_name}", + user_email.map(String::as_str).unwrap_or("user") + ) + .with_status(StatusCode::FORBIDDEN)); + } + + let Some(data_plane) = snapshot.data_plane_by_catalog_name(&data_plane_name) else { + return Err(anyhow::anyhow!("data-plane {data_plane_name} not found") + .with_status(StatusCode::NOT_FOUND)); + }; + let Some(encoding_key) = data_plane.hmac_keys.first() else { + return Err( + anyhow::anyhow!("data-plane {data_plane_name} has no configured HMAC keys") + .with_status(StatusCode::INTERNAL_SERVER_ERROR), + ); + }; + let encoding_key = jsonwebtoken::EncodingKey::from_base64_secret(&encoding_key) + .context("invalid data-plane hmac key")?; + + let broker_claims = super::DataClaims { + inner: proto_gazette::Claims { + cap: super::map_capability_to_gazette(capability), + exp: 0, // Filled later. + iat: 0, // Filled later. + iss: data_plane.data_plane_fqdn.clone(), + sub: user_id.to_string(), + sel: proto_gazette::broker::LabelSelector { + include: Some(labels::build_set([ + ("name:prefix", prefix.as_str()), + ("name:prefix", &format!("recovery/capture/{prefix}")), + ("name:prefix", &format!("recovery/derivation/{prefix}")), + ("name:prefix", &format!("recovery/materialize/{prefix}")), + ])), + exclude: None, + }, + }, + prefixes: Vec::new(), // TODO(johnny): remove. + }; + + let reactor_claims = super::DataClaims { + inner: proto_gazette::Claims { + cap: super::map_capability_to_gazette(capability) + | proto_flow::capability::NETWORK_PROXY, + exp: 0, // Filled later. + iat: 0, // Filled later. + iss: data_plane.data_plane_fqdn.clone(), + sub: user_id.to_string(), + sel: proto_gazette::broker::LabelSelector { + include: Some(labels::build_set([ + ("id:prefix", format!("capture/{prefix}").as_str()), + ("id:prefix", &format!("derivation/{prefix}")), + ("id:prefix", &format!("materialize/{prefix}")), + ])), + exclude: None, + }, + }, + prefixes: Vec::new(), // TODO(johnny): remove. + }; + + Ok(( + None, // This API does not enforce cordons. + ( + encoding_key, + broker_claims, + super::maybe_rewrite_address(true, &data_plane.broker_address), + reactor_claims, + super::maybe_rewrite_address(true, &data_plane.reactor_address), + ), + )) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_success_one() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("bobCo/tires/"), + models::Name::new("ops/dp/public/plane-two"), + models::Capability::Admin, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Ok": [ + "broker.2", + { + "cap": 30, + "exp": 0, + "iat": 0, + "iss": "fqdn2", + "sel": { + "include": { + "labels": [ + { + "name": "name", + "value": "bobCo/tires/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/capture/bobCo/tires/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/derivation/bobCo/tires/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/materialize/bobCo/tires/", + "prefix": true + } + ] + } + }, + "sub": "20202020-2020-2020-2020-202020202020" + }, + "reactor.2", + { + "cap": 262174, + "exp": 0, + "iat": 0, + "iss": "fqdn2", + "sel": { + "include": { + "labels": [ + { + "name": "id", + "value": "capture/bobCo/tires/", + "prefix": true + }, + { + "name": "id", + "value": "derivation/bobCo/tires/", + "prefix": true + }, + { + "name": "id", + "value": "materialize/bobCo/tires/", + "prefix": true + } + ] + } + }, + "sub": "20202020-2020-2020-2020-202020202020" + } + ] + } + "###); + } + + #[tokio::test] + async fn test_success_two() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("acmeCo/shared/stuff/"), + models::Name::new("ops/dp/public/plane-two"), + models::Capability::Read, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Ok": [ + "broker.2", + { + "cap": 10, + "exp": 0, + "iat": 0, + "iss": "fqdn2", + "sel": { + "include": { + "labels": [ + { + "name": "name", + "value": "acmeCo/shared/stuff/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/capture/acmeCo/shared/stuff/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/derivation/acmeCo/shared/stuff/", + "prefix": true + }, + { + "name": "name", + "value": "recovery/materialize/acmeCo/shared/stuff/", + "prefix": true + } + ] + } + }, + "sub": "20202020-2020-2020-2020-202020202020" + }, + "reactor.2", + { + "cap": 262154, + "exp": 0, + "iat": 0, + "iss": "fqdn2", + "sel": { + "include": { + "labels": [ + { + "name": "id", + "value": "capture/acmeCo/shared/stuff/", + "prefix": true + }, + { + "name": "id", + "value": "derivation/acmeCo/shared/stuff/", + "prefix": true + }, + { + "name": "id", + "value": "materialize/acmeCo/shared/stuff/", + "prefix": true + } + ] + } + }, + "sub": "20202020-2020-2020-2020-202020202020" + } + ] + } + "###); + } + + #[tokio::test] + async fn test_not_authorized_to_prefix() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("acmeCo/whoosh/"), + models::Name::new("ops/dp/public/plane-two"), + models::Capability::Write, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 403, + "error": "bob@bob is not authorized to acmeCo/whoosh/ for Write" + } + } + "###); + } + + #[tokio::test] + async fn test_not_authorized_to_data_plane() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("bobCo/tires/"), + models::Name::new("ops/dp/private/something"), + models::Capability::Admin, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 403, + "error": "bob@bob is not authorized to ops/dp/private/something" + } + } + "###); + } + + #[tokio::test] + async fn test_data_plane_not_found() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("bobCo/tires/"), + models::Name::new("ops/dp/public/plane-missing"), + models::Capability::Admin, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 404, + "error": "data-plane ops/dp/public/plane-missing not found" + } + } + "###); + } + + #[tokio::test] + async fn test_capability_to_high() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Prefix::new("acmeCo/shared/stuff/"), + models::Name::new("ops/dp/public/plane-two"), + models::Capability::Write, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 403, + "error": "bob@bob is not authorized to acmeCo/shared/stuff/ for Write" + } + } + "###); + } + + async fn run( + user_id: uuid::Uuid, + email: Option, + prefix: models::Prefix, + data_plane: models::Name, + capability: models::Capability, + ) -> Result<(String, proto_gazette::Claims, String, proto_gazette::Claims), crate::api::ApiError> + { + let taken = chrono::Utc::now(); + let snapshot = Snapshot::build_fixture(Some(taken)); + let snapshot = std::sync::RwLock::new(snapshot); + + let Response { + broker_address, + broker_token, + reactor_address, + reactor_token, + retry_millis, + } = do_authorize_user_prefix( + &snapshot, + user_id, + email, + prefix, + data_plane, + capability, + taken.timestamp() as u64 - 1, + ) + .await? + .0; + + if retry_millis != 0 { + return Err(anyhow::anyhow!("retry").into()); + } + + let mut broker_claims = jsonwebtoken::decode::( + &broker_token, + &jsonwebtoken::DecodingKey::from_secret("key3".as_bytes()), + &jsonwebtoken::Validation::default(), + ) + .expect("failed to decode response token") + .claims + .inner; + + let mut reactor_claims = jsonwebtoken::decode::( + &reactor_token, + &jsonwebtoken::DecodingKey::from_secret("key3".as_bytes()), + &jsonwebtoken::Validation::default(), + ) + .expect("failed to decode response token") + .claims + .inner; + + (broker_claims.iat, broker_claims.exp) = (0, 0); + (reactor_claims.iat, reactor_claims.exp) = (0, 0); + + Ok(( + broker_address, + broker_claims, + reactor_address, + reactor_claims, + )) + } +} diff --git a/crates/agent/src/api/authorize_user_task.rs b/crates/agent/src/api/authorize_user_task.rs index 18babb4b20a..13b1564fe0e 100644 --- a/crates/agent/src/api/authorize_user_task.rs +++ b/crates/agent/src/api/authorize_user_task.rs @@ -19,9 +19,21 @@ pub async fn authorize_user_task( email, .. }): axum::Extension, - super::Request(Request { task, started_unix }): super::Request, + super::Request(Request { + task, + capability, + started_unix, + }): super::Request, ) -> Result, crate::api::ApiError> { - do_authorize_user_task(&app.snapshot, user_id, email, task, started_unix).await + do_authorize_user_task( + &app.snapshot, + user_id, + email, + task, + capability, + started_unix, + ) + .await } pub async fn do_authorize_user_task( @@ -29,6 +41,7 @@ pub async fn do_authorize_user_task( user_id: uuid::Uuid, email: Option, task: models::Name, + capability: models::Capability, started_unix: u64, ) -> Result, crate::api::ApiError> { let (has_started, started) = if started_unix == 0 { @@ -42,7 +55,7 @@ pub async fn do_authorize_user_task( loop { match Snapshot::evaluate(snapshot, started, |snapshot: &Snapshot| { - evaluate_authorization(snapshot, user_id, email.as_ref(), &task) + evaluate_authorization(snapshot, user_id, email.as_ref(), &task, capability) }) { Ok(( exp, @@ -98,6 +111,7 @@ fn evaluate_authorization( user_id: uuid::Uuid, user_email: Option<&String>, task_name: &models::Name, + capability: models::Capability, ) -> Result< ( Option>, @@ -119,10 +133,10 @@ fn evaluate_authorization( &snapshot.user_grants, user_id, task_name, - models::Capability::Read, + capability, ) { return Err(anyhow::anyhow!( - "{} is not authorized to {task_name}", + "{} is not authorized to {task_name} for {capability:?}", user_email.map(String::as_str).unwrap_or("user") ) .with_status(StatusCode::FORBIDDEN)); @@ -185,8 +199,7 @@ fn evaluate_authorization( let reactor_claims = super::DataClaims { inner: proto_gazette::Claims { - cap: proto_gazette::capability::LIST - | proto_gazette::capability::READ + cap: super::map_capability_to_gazette(capability) | proto_flow::capability::NETWORK_PROXY, exp: 0, // Filled later. iat: 0, // Filled later. @@ -228,6 +241,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Name::new("bobCo/anvils/materialize-orange"), + models::Capability::Read, ) .await; @@ -289,6 +303,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Name::new("acmeCo/other/thing"), + models::Capability::Read, ) .await; @@ -296,18 +311,38 @@ mod tests { { "Err": { "status": 403, - "error": "bob@bob is not authorized to acmeCo/other/thing" + "error": "bob@bob is not authorized to acmeCo/other/thing for Read" } } "###); } + #[tokio::test] + async fn test_capability_to_high() { + let outcome = run( + uuid::Uuid::from_bytes([32; 16]), + Some("bob@bob".to_string()), + models::Name::new("bobCo/anvils/materialize-orange"), + models::Capability::Admin, + ) + .await; + + insta::assert_json_snapshot!(outcome, @r###" + { + "Err": { + "status": 403, + "error": "bob@bob is not authorized to bobCo/anvils/materialize-orange for Admin" + } + } + "###); + } #[tokio::test] async fn test_not_found() { let outcome = run( uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Name::new("bobCo/widgets/not/found"), + models::Capability::Read, ) .await; @@ -327,6 +362,7 @@ mod tests { uuid::Uuid::from_bytes([32; 16]), Some("bob@bob".to_string()), models::Name::new("bobCo/widgets/materialize-mango"), + models::Capability::Read, ) .await; @@ -344,6 +380,7 @@ mod tests { user_id: uuid::Uuid, email: Option, task: models::Name, + capability: models::Capability, ) -> Result< ( String, @@ -374,6 +411,7 @@ mod tests { user_id, email, task, + capability, taken.timestamp() as u64 - 1, ) .await? diff --git a/crates/agent/src/api/mod.rs b/crates/agent/src/api/mod.rs index 55dc28404d7..6cb1f4d16ee 100644 --- a/crates/agent/src/api/mod.rs +++ b/crates/agent/src/api/mod.rs @@ -4,6 +4,7 @@ use std::sync::{Arc, Mutex}; mod authorize_dekaf; mod authorize_task; mod authorize_user_collection; +mod authorize_user_prefix; mod authorize_user_task; mod create_data_plane; mod error; @@ -154,14 +155,20 @@ pub fn build_router( .route("/authorize/task", post(authorize_task::authorize_task)) .route("/authorize/dekaf", post(authorize_dekaf::authorize_dekaf)) .route( - "/authorize/user/task", - post(authorize_user_task::authorize_user_task) + "/authorize/user/collection", + post(authorize_user_collection::authorize_user_collection) .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) .options(preflight_handler), ) .route( - "/authorize/user/collection", - post(authorize_user_collection::authorize_user_collection) + "/authorize/user/prefix", + post(authorize_user_prefix::authorize_user_prefix) + .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) + .options(preflight_handler), + ) + .route( + "/authorize/user/task", + post(authorize_user_task::authorize_user_task) .route_layer(axum::middleware::from_fn_with_state(app.clone(), authorize)) .options(preflight_handler), ) @@ -356,3 +363,22 @@ fn maybe_rewrite_address(external: bool, address: &str) -> String { address.to_string() } } + +const fn map_capability_to_gazette(capability: models::Capability) -> u32 { + match capability { + models::Capability::Read => { + proto_gazette::capability::LIST | proto_gazette::capability::READ + } + models::Capability::Write => { + proto_gazette::capability::LIST + | proto_gazette::capability::READ + | proto_gazette::capability::APPEND + } + models::Capability::Admin => { + proto_gazette::capability::LIST + | proto_gazette::capability::READ + | proto_gazette::capability::APPEND + | proto_gazette::capability::APPLY + } + } +} diff --git a/crates/agent/src/api/snapshot.rs b/crates/agent/src/api/snapshot.rs index 274457afb44..3f2b27a402c 100644 --- a/crates/agent/src/api/snapshot.rs +++ b/crates/agent/src/api/snapshot.rs @@ -15,6 +15,8 @@ pub struct Snapshot { pub data_planes: tables::DataPlanes, // Indices of `data_planes`, indexed on `data_plane_fqdn`. pub data_planes_idx_fqdn: Vec, + // Indices of `data_planes`, indexed on `data_plane_name`. + pub data_planes_idx_name: Vec, // Data-plane migrations that are underway. pub migrations: Vec, // Platform role grants. @@ -78,6 +80,7 @@ impl Snapshot { collections_idx_name: Vec::new(), data_planes: tables::DataPlanes::default(), data_planes_idx_fqdn: Vec::new(), + data_planes_idx_name: Vec::new(), migrations: Vec::new(), role_grants: tables::RoleGrants::default(), user_grants: tables::UserGrants::default(), @@ -118,6 +121,7 @@ impl Snapshot { let mut collections_idx_name = Vec::from_iter(0..collections.len()); let mut data_planes_idx_fqdn = Vec::from_iter(0..data_planes.len()); + let mut data_planes_idx_name = Vec::from_iter(0..data_planes.len()); let mut tasks_idx_name = Vec::from_iter(0..tasks.len()); collections_idx_name.sort_by(|i1, i2| { @@ -130,6 +134,11 @@ impl Snapshot { .data_plane_fqdn .cmp(&data_planes[*i2].data_plane_fqdn) }); + data_planes_idx_name.sort_by(|i1, i2| { + data_planes[*i1] + .data_plane_name + .cmp(&data_planes[*i2].data_plane_name) + }); tasks_idx_name.sort_by(|i1, i2| tasks[*i1].task_name.cmp(&tasks[*i2].task_name)); Snapshot { @@ -138,6 +147,7 @@ impl Snapshot { collections_idx_name, data_planes, data_planes_idx_fqdn, + data_planes_idx_name, migrations, role_grants, user_grants, @@ -292,6 +302,18 @@ impl Snapshot { .map(|index| &self.collections[index]) } + // Retrieve the data-plane having the exact catalog `name`. + pub fn data_plane_by_catalog_name<'s>(&'s self, name: &str) -> Option<&'s tables::DataPlane> { + self.data_planes_idx_name + .binary_search_by(|i| self.data_planes[*i].data_plane_name.as_str().cmp(name)) + .ok() + .map(|index| { + let data_plane = &self.data_planes[self.data_planes_idx_name[index]]; + assert_eq!(data_plane.data_plane_name.as_str(), name); + data_plane + }) + } + pub fn verify_data_plane_token<'s>( &'s self, iss_fqdn: &str, @@ -608,13 +630,30 @@ impl Snapshot { object_role: models::Prefix::new("bobCo/"), capability: models::Capability::Write, }, + tables::RoleGrant { + subject_role: models::Prefix::new("bobCo/tires/"), + object_role: models::Prefix::new("acmeCo/shared/"), + capability: models::Capability::Read, + }, + tables::RoleGrant { + subject_role: models::Prefix::new("bobCo/"), + object_role: models::Prefix::new("ops/dp/public/"), + capability: models::Capability::Read, + }, ]; - let user_grants = vec![tables::UserGrant { - user_id: uuid::Uuid::from_bytes([32; 16]), - object_role: models::Prefix::new("bobCo/"), - capability: models::Capability::Write, - }]; + let user_grants = vec![ + tables::UserGrant { + user_id: uuid::Uuid::from_bytes([32; 16]), + object_role: models::Prefix::new("bobCo/"), + capability: models::Capability::Write, + }, + tables::UserGrant { + user_id: uuid::Uuid::from_bytes([32; 16]), + object_role: models::Prefix::new("bobCo/tires/"), + capability: models::Capability::Admin, + }, + ]; let tasks = [ ("acmeCo/source-pineapple", models::CatalogType::Capture, 1), @@ -777,6 +816,31 @@ mod tests { .is_none()); } + #[test] + fn test_data_plane_lookups() { + let snapshot = Snapshot::build_fixture(None); + + assert_eq!( + snapshot + .data_plane_by_catalog_name("ops/dp/public/plane-one") + .unwrap() + .data_plane_name + .as_str(), + "ops/dp/public/plane-one" + ); + assert_eq!( + snapshot + .data_plane_by_catalog_name("ops/dp/public/plane-two") + .unwrap() + .data_plane_name + .as_str(), + "ops/dp/public/plane-two" + ); + assert!(snapshot + .data_plane_by_catalog_name("ops/dp/public/plane-one/1") + .is_none()); // Non-existent name should not match. + } + #[test] fn test_verify_data_plane_token() { let snapshot = Snapshot::build_fixture(None); diff --git a/crates/flow-client/src/client.rs b/crates/flow-client/src/client.rs index 85f179436c7..b049f44bb9f 100644 --- a/crates/flow-client/src/client.rs +++ b/crates/flow-client/src/client.rs @@ -288,6 +288,7 @@ pub async fn fetch_user_task_authorization( &models::authorizations::UserTaskAuthorizationRequest { started_unix, task: models::Name::new(task), + capability: models::Capability::Read, }, ) .await?; @@ -359,6 +360,7 @@ pub async fn fetch_user_collection_authorization( &models::authorizations::UserCollectionAuthorizationRequest { started_unix, collection: models::Collection::new(collection), + capability: models::Capability::Read, }, ) .await?; @@ -391,6 +393,35 @@ pub async fn fetch_user_collection_authorization( Ok((journal_name_prefix, journal_client)) } +#[tracing::instrument(skip(client), err)] +pub async fn fetch_user_prefix_authorization( + client: &Client, + mut request: models::authorizations::UserPrefixAuthorizationRequest, +) -> anyhow::Result { + if request.started_unix == 0 { + request.started_unix = std::time::SystemTime::now() + .duration_since(std::time::SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + } + + loop { + let response: models::authorizations::UserPrefixAuthorization = client + .agent_unary("/authorize/user/prefix", &request) + .await?; + + if response.retry_millis != 0 { + tracing::warn!( + secs = response.retry_millis as f64 / 1000.0, + "authorization service tentatively rejected our request, but we'll retry before failing" + ); + () = tokio::time::sleep(std::time::Duration::from_millis(response.retry_millis)).await; + continue; + } + return Ok(response); + } +} + pub async fn refresh_authorizations( client: &Client, access_token: Option, diff --git a/crates/flow-client/src/lib.rs b/crates/flow-client/src/lib.rs index d75c6db93e1..7c177b8d6c9 100644 --- a/crates/flow-client/src/lib.rs +++ b/crates/flow-client/src/lib.rs @@ -2,8 +2,8 @@ use anyhow::Context; pub mod client; pub use client::{ - fetch_task_authorization, fetch_user_collection_authorization, fetch_user_task_authorization, - Client, + fetch_task_authorization, fetch_user_collection_authorization, fetch_user_prefix_authorization, + fetch_user_task_authorization, Client, }; pub mod pagination; diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index 7affd79039e..937e623c32d 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -68,6 +68,9 @@ pub enum Command { BearerLogs(BearerLogs), /// Print information about the shards for a given task ListShards(TaskSelector), + /// Print environment variables for working with a given data-plane + /// and prefix using Gazette's `gazctl`. + GazctlEnv(GazctlEnv), } #[derive(Debug, clap::Args)] @@ -229,6 +232,7 @@ impl Advanced { Command::Stats(stats) => stats.run(ctx).await, Command::BearerLogs(bearer_logs) => bearer_logs.run(ctx).await, Command::ListShards(selector) => shards::do_list_shards(ctx, selector).await, + Command::GazctlEnv(gazctl_env) => gazctl_env.run(ctx).await, } } } @@ -403,6 +407,50 @@ async fn do_combine( Ok(()) } +#[derive(clap::Args, Debug)] +pub struct GazctlEnv { + /// Name of the data-plane to work with. + #[clap(long)] + pub data_plane: String, + /// Journal and shard prefix to request authorization for. + #[clap(long)] + pub prefix: String, + #[clap(long)] + pub admin: bool, +} + +impl GazctlEnv { + pub async fn run(&self, ctx: &mut crate::CliContext) -> anyhow::Result<()> { + let models::authorizations::UserPrefixAuthorization { + broker_address, + broker_token, + reactor_address, + reactor_token, + retry_millis: _, + } = flow_client::fetch_user_prefix_authorization( + &ctx.client, + models::authorizations::UserPrefixAuthorizationRequest { + capability: if self.admin { + models::Capability::Admin + } else { + models::Capability::Read + }, + data_plane: models::Name::new(&self.data_plane), + prefix: models::Prefix::new(&self.prefix), + started_unix: 0, + }, + ) + .await?; + + println!("export BROKER_ADDRESS={broker_address}"); + println!("export BROKER_AUTH_TOKEN={broker_token}"); + println!("export CONSUMER_ADDRESS={reactor_address}"); + println!("export CONSUMER_AUTH_TOKEN={reactor_token}"); + + Ok(()) + } +} + fn parse_key_val(s: &str) -> anyhow::Result<(T, U)> where T: std::str::FromStr, diff --git a/crates/models/src/authorizations.rs b/crates/models/src/authorizations.rs index 19638658ea4..e42ecb24953 100644 --- a/crates/models/src/authorizations.rs +++ b/crates/models/src/authorizations.rs @@ -70,6 +70,9 @@ pub struct UserCollectionAuthorizationRequest { /// # Collection name to be authorized. #[validate] pub collection: crate::Collection, + /// # Requested capability level of the authorization. + #[serde(default = "capability_read")] + pub capability: crate::Capability, /// # Unix timestamp, in seconds, at which the operation started. /// If this is non-zero, it lower-bounds the time of an authorization /// snapshot required to definitively reject an authorization. @@ -92,8 +95,8 @@ pub struct UserCollectionAuthorization { #[serde(default, skip_serializing_if = "String::is_empty")] pub broker_address: String, /// # JWT token which has been authorized for use with brokers. - /// The token is capable of LIST and READ for journals - /// of the requested collection. + /// The token is authorized for journal operations of the + /// requested collection and capability. #[serde(default, skip_serializing_if = "String::is_empty")] pub broker_token: String, /// # Prefix of collection Journal names. @@ -105,6 +108,53 @@ pub struct UserCollectionAuthorization { pub retry_millis: u64, } +#[derive( + Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema, validator::Validate, +)] +#[serde(rename_all = "camelCase")] +pub struct UserPrefixAuthorizationRequest { + /// # Prefix to be authorized. + #[validate] + pub prefix: crate::Prefix, + /// # Name of the data-plane to be authorized. + #[validate] + pub data_plane: crate::Name, + /// # Requested capability level of the authorization. + #[serde(default = "capability_read")] + pub capability: crate::Capability, + /// # Unix timestamp, in seconds, at which the operation started. + /// This timestamp lower-bounds the time of an authorization + /// snapshot required to definitively reject an authorization. + /// + /// Snapshots taken prior to this time point that reject the request + /// will return a Response asking for the operation to be retried. + pub started_unix: u64, +} + +#[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct UserPrefixAuthorization { + /// # Address of Gazette brokers for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_address: String, + /// # JWT token which has been authorized for use with brokers. + /// The token is authorized for journal operations over the + /// requested prefix and capability. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub broker_token: String, + /// # Address of Reactors for the issued token. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub reactor_address: String, + /// # JWT token which has been authorized for use with reactors. + /// The token is authorized for shard operations over the + /// requested prefix and capability. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub reactor_token: String, + /// # Number of milliseconds to wait before retrying the request. + /// Non-zero if and only if token is not set. + pub retry_millis: u64, +} + #[derive( Debug, serde::Serialize, serde::Deserialize, schemars::JsonSchema, validator::Validate, )] @@ -113,6 +163,9 @@ pub struct UserTaskAuthorizationRequest { /// # Task name to be authorized. #[validate] pub task: crate::Name, + /// # Requested capability level of the authorization. + #[serde(default = "capability_read")] + pub capability: crate::Capability, /// # Unix timestamp, in seconds, at which the operation started. /// If this is non-zero, it lower-bounds the time of an authorization /// snapshot required to definitively reject an authorization. @@ -146,7 +199,8 @@ pub struct UserTaskAuthorization { #[serde(default, skip_serializing_if = "String::is_empty")] pub reactor_address: String, /// # JWT token which has been authorized for use with reactors. - /// The token is capable of LIST, READ, and NETWORK_PROXY of task shards. + /// The token is authorized for shard operations of the + /// requested task and capability. #[serde(default, skip_serializing_if = "String::is_empty")] pub reactor_token: String, /// # Number of milliseconds to wait before retrying the request. @@ -156,6 +210,7 @@ pub struct UserTaskAuthorization { #[serde(default, skip_serializing_if = "String::is_empty")] pub shard_id_prefix: String, } + #[derive(Debug, Default, serde::Serialize, serde::Deserialize, schemars::JsonSchema)] #[serde(rename_all = "camelCase")] pub struct DekafAuthResponse { @@ -176,3 +231,7 @@ pub struct DekafAuthResponse { /// Non-zero if and only if token is not set. pub retry_millis: u64, } + +const fn capability_read() -> crate::Capability { + crate::Capability::Read +}