Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

obs: cleanup metrics and logging #53

Merged
merged 2 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions crates/freighter-auth/src/cf_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,14 @@ impl UserId {
impl CfAccess {
/// Team base URL must start with `https://`
pub fn new(team_base_url: &str, audience: &str) -> Result<Self, anyhow::Error> {
if team_base_url.len() < 13 || !team_base_url.starts_with("https://") || audience.is_empty() {
if team_base_url.len() < 13 || !team_base_url.starts_with("https://") || audience.is_empty()
{
bail!("invalid cf-access config")
}
let jwks_url = format!("{}/cdn-cgi/access/certs", team_base_url.trim_end_matches('/'));
let jwks_url = format!(
"{}/cdn-cgi/access/certs",
team_base_url.trim_end_matches('/')
);

let mut validation = Validation::new(jsonwebtoken::Algorithm::RS256);
validation.set_audience(&[audience]);
Expand Down Expand Up @@ -76,18 +80,31 @@ impl CfAccess {

locked_keys.next_fetch = now + Duration::from_secs(1); // in case of failure, retry 1/s
let set: JwkSet = async {
reqwest::get(&self.jwks_url).await?
reqwest::get(&self.jwks_url)
.await?
.error_for_status()?
.json().await
}.await.inspect_err(|e| tracing::error!("{}: {e}", self.jwks_url))?;
locked_keys.keys = set.keys.into_iter()
.filter(|k| k.common.public_key_use.as_ref().is_some_and(|s| *s == PublicKeyUse::Signature))
.json()
.await
}
.await
.inspect_err(|e| tracing::error!("{}: {e}", self.jwks_url))?;
locked_keys.keys = set
.keys
.into_iter()
.filter(|k| {
k.common
.public_key_use
.as_ref()
.is_some_and(|s| *s == PublicKeyUse::Signature)
})
.filter_map(|k| {
let key = DecodingKey::from_jwk(&k)
.inspect_err(|e| tracing::error!("{k:?}: {e}")).ok()?;
.inspect_err(|e| tracing::error!("{k:?}: {e}"))
.ok()?;
let kid = k.common.key_id?;
Some((kid, key))
}).collect();
})
.collect();
if locked_keys.keys.is_empty() {
tracing::error!("no usable keys");
anyhow::bail!("no usable keys");
Expand All @@ -100,10 +117,11 @@ impl CfAccess {
pub async fn validated_user_id(&self, token: &str) -> AuthResult<UserId> {
let key_id = jsonwebtoken::decode_header(token)
.map_err(|e| {
tracing::debug!("bad token: {token}: {e}");
tracing::warn!("bad token: {token}: {e}");
AuthError::InvalidCredentials
})?
.kid.ok_or(AuthError::InvalidCredentials)?;
.kid
.ok_or(AuthError::InvalidCredentials)?;

let locked_keys = loop {
let tmp = self.key_set.read().await;
Expand All @@ -116,29 +134,36 @@ impl CfAccess {
};

let Some(key) = locked_keys.keys.get(key_id.as_str()) else {
tracing::debug!("token for an unknown key: {token}: {key_id}");
tracing::warn!("token for an unknown key: {token}: {key_id}");
return Err(AuthError::InvalidCredentials);
};

let claims = jsonwebtoken::decode::<Claims>(token, key, &self.validation)
.map_err(|e| {
tracing::debug!("unauthorized: {token}: {e}");
tracing::warn!("unauthorized: {token}: {e}");
AuthError::Unauthorized
})?.claims;
})?
.claims;

// Service Token gets an empty string in `sub`!
let user_id = claims.sub.filter(|s| !s.is_empty()).or(claims.common_name)
let user_id = claims
.sub
.filter(|s| !s.is_empty())
.or(claims.common_name)
.ok_or_else(|| anyhow::anyhow!("empty claims.sub"))?;
Ok(UserId(user_id))
}
}


#[cfg(test)]
#[tokio::test]
#[ignore]
async fn cf_access_token_test() {
let token = "…"; // Needs non-expired token ;(
let a = CfAccess::new("https://cf-rust.cloudflareaccess.com", "1de8297ce3d45d1962a73a04fcef47b434d95f0ad2134d4d5bd9876086695262").unwrap();
let a = CfAccess::new(
"https://cf-rust.cloudflareaccess.com",
"1de8297ce3d45d1962a73a04fcef47b434d95f0ad2134d4d5bd9876086695262",
)
.unwrap();
a.validated_user_id(token).await.unwrap();
}
32 changes: 21 additions & 11 deletions crates/freighter-pg-index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,10 @@ impl IndexProvider for PgIndexProvider {
.context("Failed to prepare statements for publish transaction")?;

histogram!(
"publish_component_duration_seconds",
"freighter_publish_component_duration_seconds",
"component" => "startup"
).record(startup_timer.elapsed());
)
.record(startup_timer.elapsed());

let crate_timer = Instant::now();

Expand Down Expand Up @@ -349,7 +350,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "crate"
).record(crate_timer.elapsed());
)
.record(crate_timer.elapsed());

let get_keycat_timer = Instant::now();

Expand All @@ -372,7 +374,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "get_keycat"
).record(get_keycat_timer.elapsed());
)
.record(get_keycat_timer.elapsed());

// add missing keywords and categories

Expand Down Expand Up @@ -411,7 +414,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "add_to_keycat"
).record(add_to_keycat_timer.elapsed());
)
.record(add_to_keycat_timer.elapsed());

// prune unneeded keywords and categories

Expand All @@ -438,7 +442,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "prune_keycat"
).record(prune_keycat_timer.elapsed());
)
.record(prune_keycat_timer.elapsed());

let insert_version_timer = Instant::now();

Expand All @@ -459,7 +464,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "insert_version"
).record(insert_version_timer.elapsed());
)
.record(insert_version_timer.elapsed());

let insert_dependencies_timer = Instant::now();

Expand Down Expand Up @@ -489,7 +495,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "insert_dependencies"
).record(insert_dependencies_timer.elapsed());
)
.record(insert_dependencies_timer.elapsed());

let insert_features_timer = Instant::now();

Expand All @@ -506,7 +513,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "insert_features"
).record(insert_features_timer.elapsed());
)
.record(insert_features_timer.elapsed());

let end_step_timer = Instant::now();

Expand All @@ -517,7 +525,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "end_step"
).record(end_step_timer.elapsed());
)
.record(end_step_timer.elapsed());

let commit_timer = Instant::now();

Expand All @@ -529,7 +538,8 @@ impl IndexProvider for PgIndexProvider {
histogram!(
"publish_component_duration_seconds",
"component" => "commit"
).record(commit_timer.elapsed());
)
.record(commit_timer.elapsed());

Ok(CompletedPublication { warnings: None })
}
Expand Down
86 changes: 72 additions & 14 deletions crates/freighter-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use axum::{Form, Json, Router};
use freighter_api_types::auth::request::AuthForm;
use freighter_api_types::index::request::{Publish, SearchQuery};
use freighter_api_types::index::response::{CompletedPublication, SearchResults};
use freighter_api_types::index::IndexProvider;
use freighter_api_types::storage::StorageProvider;
use freighter_auth::AuthProvider;
use freighter_api_types::index::{IndexError, IndexProvider};
use freighter_api_types::storage::{StorageError, StorageProvider};
use freighter_auth::{AuthError, AuthProvider};
use metrics::counter;
use semver::Version;
use serde::Deserialize;
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -79,9 +80,27 @@ where

let json: Publish = serde_json::from_slice(&json_bytes).map_err(|_| StatusCode::BAD_REQUEST)?;

let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

let auth_result = state.auth.publish(auth, &json.name).await;

if let Err(e) = &auth_result {
let error_label = match e {
AuthError::Unauthorized => "unauthorized",
AuthError::Forbidden => "forbidden",
AuthError::InvalidCredentials => "invalid_credentials",
AuthError::Unimplemented => "unimplemented",
AuthError::CrateNotFound => "crate_not_found",
AuthError::ServiceError(_) => "service_error",
};

counter!("freighter_publish_auth_errors_total", "error" => error_label).increment(1);
}

state.auth.publish(auth, &json.name).await?;
auth_result?;

let version = json.vers.to_string();
let storage = state.storage.clone();
Expand All @@ -91,10 +110,22 @@ where
let sha256 = Sha256::digest(&crate_bytes);
let hash = format!("{sha256:x}");
let end_step = std::pin::pin!(async {
storage
let res = storage
.put_crate(&json.name, &version, crate_bytes, sha256.into())
.await
.context("Failed to store the crate in a storage medium")?;
.await;

if let Err(e) = &res {
let error_label = match e {
StorageError::NotFound => "not_found",
StorageError::ServiceError(_) => "service_error",
};

counter!("freighter_publish_tarballs_errors_total", "error" => error_label)
.increment(1);
}

res.context("Failed to store the crate in a storage medium")?;

stored_crate = true;
Ok(())
});
Expand All @@ -108,6 +139,15 @@ where
Ok(Json(res))
}
Err(e) => {
let error_label = match &e {
IndexError::Conflict(_) => "conflict",
IndexError::CrateNameNotAllowed => "crate_name_not_allowed",
IndexError::NotFound => "crate_not_found",
IndexError::ServiceError(_) => "service_error",
};

counter!("freighter_publish_index_errors_total", "error" => error_label).increment(1);

if stored_crate {
let _ = storage.delete_crate(&json.name, &version).await;
}
Expand All @@ -125,7 +165,10 @@ where
I: IndexProvider,
A: AuthProvider,
{
let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

state.auth.auth_yank(auth, &name).await?;

Expand All @@ -143,7 +186,10 @@ where
I: IndexProvider,
A: AuthProvider,
{
let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

state.auth.auth_yank(auth, &name).await?;

Expand All @@ -160,7 +206,10 @@ async fn list_owners<I, S, A>(
where
A: AuthProvider,
{
let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

state.auth.list_owners(auth, &name).await?;

Expand All @@ -176,7 +225,10 @@ async fn add_owners<I, S, A>(
where
A: AuthProvider,
{
let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

state
.auth
Expand All @@ -199,7 +251,10 @@ async fn remove_owners<I, S, A>(
where
A: AuthProvider,
{
let auth = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let auth = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;

state
.auth
Expand Down Expand Up @@ -239,7 +294,10 @@ where
A: AuthProvider + Sync,
{
if state.config.auth_required {
let token = state.auth.token_from_headers(&headers)?.ok_or(StatusCode::UNAUTHORIZED)?;
let token = state
.auth
.token_from_headers(&headers)?
.ok_or(StatusCode::UNAUTHORIZED)?;
state.auth.auth_view_full_index(token).await?;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/freighter-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ where
.with_state(state)
.fallback(handle_global_fallback)
.layer(CatchPanicLayer::custom(|_| {
counter!("panics_total").increment(1);
counter!("freighter_panics_total").increment(1);

StatusCode::INTERNAL_SERVER_ERROR.into_response()
}))
Expand Down Expand Up @@ -120,7 +120,7 @@ async fn metrics_layer<B>(request: Request<B>, next: Next<B>) -> Response {

let code = response.status().as_u16().to_string();

histogram!("request_duration_seconds", "code" => code, "endpoint" => path)
histogram!("freighter_request_duration_seconds", "code" => code, "endpoint" => path)
.record(elapsed);

response
Expand Down
1 change: 0 additions & 1 deletion crates/freighter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ async fn main() -> anyhow::Result<()> {
} = config;

PrometheusBuilder::new()
.add_global_label("service", "freighter")
.with_http_listener(service.metrics_address)
.set_buckets(&[
100e-6, 500e-6, 1e-3, 5e-3, 1e-2, 5e-2, 1e-1, 2e-1, 3e-1, 4e-1, 5e-1, 6e-1, 7e-1, 8e-1,
Expand Down
Loading