Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Report unhealthy instead of terminating on panic #4250

Merged
merged 8 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
**Bug Fixes**:

- Allow profile chunks without release. ([#4155](https://github.com/getsentry/relay/pull/4155))
- Terminate the process when one of the services crashes. ([#4249](https://github.com/getsentry/relay/pull/4249))
- Report unhealthy when one of the services crashes. ([#4249](https://github.com/getsentry/relay/pull/4249), [#4250](https://github.com/getsentry/relay/pull/4250))

**Features**:

Expand Down
13 changes: 9 additions & 4 deletions relay-server/src/endpoints/health_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,18 @@ struct Status {
is_healthy: bool,
}

const UNHEALTHY: (hyper::StatusCode, axum::Json<Status>) = (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(Status { is_healthy: false }),
);

pub async fn handle(state: ServiceState, Path(kind): Path<IsHealthy>) -> impl IntoResponse {
if state.has_crashed() {
return UNHEALTHY;
}
match state.health_check().send(kind).await {
Ok(HealthStatus::Healthy) => (StatusCode::OK, axum::Json(Status { is_healthy: true })),
_ => (
StatusCode::SERVICE_UNAVAILABLE,
axum::Json(Status { is_healthy: false }),
),
_ => UNHEALTHY,
}
}

Expand Down
14 changes: 9 additions & 5 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,15 @@ pub fn run(config: Config) -> anyhow::Result<()> {
runner.start(HttpServer::new(config, state.clone())?);

tokio::select! {
_ = runner.join() => {},
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
_ = runner.join(|e| {
if e.is_panic() {
state.report_crash();
}
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
// See also https://github.com/getsentry/relay/issues/4050.
}) => {},
_ = Controller::shutdown_handle().finished() => {}
}

Expand Down
19 changes: 19 additions & 0 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::convert::Infallible;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -135,6 +136,7 @@ struct StateInner {
config: Arc<Config>,
memory_checker: MemoryChecker,
registry: Registry,
service_crashed: AtomicBool,
}

/// Server state.
Expand Down Expand Up @@ -333,6 +335,7 @@ impl ServiceState {
config: config.clone(),
memory_checker: MemoryChecker::new(memory_stat, config.clone()),
registry,
service_crashed: false.into(),
};

Ok((
Expand Down Expand Up @@ -409,6 +412,22 @@ impl ServiceState {
pub fn outcome_aggregator(&self) -> &Addr<TrackOutcome> {
&self.inner.registry.outcome_aggregator
}

/// Checks whether one of the services has crashed.
pub fn has_crashed(&self) -> bool {
self.inner
.service_crashed
.load(std::sync::atomic::Ordering::Relaxed)
}

/// Reports that one of the services has crashed.
///
/// Marks the entire service state as unhealthy.
pub fn report_crash(&self) {
self.inner
.service_crashed
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}

fn create_redis_pool(redis_config: RedisConfigRef) -> Result<RedisPool, RedisError> {
Expand Down
9 changes: 3 additions & 6 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::future::Shared;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::task::{JoinError, JoinHandle};
use tokio::time::MissedTickBehavior;

use crate::statsd::{SystemCounters, SystemGauges};
Expand Down Expand Up @@ -1060,13 +1060,10 @@ impl ServiceRunner {
/// Awaits until all services have finished.
///
/// Panics if one of the spawned services has panicked.
pub async fn join(&mut self) {
pub async fn join<F: Fn(JoinError)>(&mut self, error_handler: F) {
while let Some(res) = self.0.next().await {
if let Err(e) = res {
if e.is_panic() {
// Re-trigger panic to terminate the process:
std::panic::resume_unwind(e.into_panic());
}
error_handler(e);
}
}
}
Expand Down
Loading