Skip to content

Commit

Permalink
ref(server): Run processor threads with lower priority (#4222)
Browse files Browse the repository at this point in the history
The original plan was to also set the scheduler priority to
`SCHED_BATCH` (`man sched(7)`) but while looking for other projects I
found Apache Arrow Fusion which also sets their priority for a CPU bound
worker pool:
https://github.com/msathis/arrow-datafusion/blob/bb646802001ffbb1cc35679eea0ded86dd769a8d/ballista/rust/executor/src/cpu_bound_executor.rs#L58-L59

As well as a referenced blog article:
https://thenewstack.io/using-rustlangs-async-tokio-runtime-for-cpu-bound-tasks/

Convincing me that this is in fact a good idea as well as dropping the
idea of messing with scheduler priorities, which in testing also seems
to be very finicky to get right.

The change is supposed to have a positive effect on tail latencies for
all tasks scheduled on the Tokio runtime as well as prioritizing these
tasks when under pressure.
  • Loading branch information
Dav1dde authored Nov 6, 2024
1 parent f0d01aa commit 71ebf8a
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Remove `buffer` entries and scrub array contents from MongoDB queries. ([#4186](https://github.com/getsentry/relay/pull/4186))
- Use `DateTime<Utc>` instead of `Instant` for tracking the received time of the `Envelope`. ([#4184](https://github.com/getsentry/relay/pull/4184))
- Add a field to suggest consumers to ingest spans in EAP. ([#4206](https://github.com/getsentry/relay/pull/4206))
- Run internal worker threads with a lower priority. ([#4222](https://github.com/getsentry/relay/pull/4222))

## 24.10.0

Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
ipnetwork = "0.20.0"
itertools = "0.13.0"
json-forensics = "0.1.1"
lru = "0.12.4"
libc = "0.2.161"
liblzma = "0.3.4"
lru = "0.12.4"
maxminddb = "0.24.0"
memchr = "2.7.4"
md5 = "0.7.0"
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hashbrown = { workspace = true }
hyper-util = { workspace = true }
itertools = { workspace = true }
json-forensics = { workspace = true }
libc = { workspace = true }
liblzma = { workspace = true }
mime = { workspace = true }
minidump = { workspace = true, optional = true }
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::services::stats::RelayStats;
use crate::services::store::StoreService;
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat};
use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
use anyhow::{Context, Result};
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
Expand Down Expand Up @@ -105,6 +105,7 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {

let pool = crate::utils::ThreadPoolBuilder::new("processor")
.num_threads(thread_count)
.thread_kind(ThreadKind::Worker)
.runtime(tokio::runtime::Handle::current())
.build()?;

Expand All @@ -114,7 +115,7 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
#[cfg(feature = "processing")]
fn create_store_pool(config: &Config) -> Result<ThreadPool> {
// Spawn a store worker for every 12 threads in the processor pool.
// This ratio was found emperically and may need adjustments in the future.
// This ratio was found empirically and may need adjustments in the future.
//
// Ideally in the future the store will be single threaded again, after we move
// all the heavy processing (de- and re-serialization) into the processor.
Expand Down
83 changes: 80 additions & 3 deletions relay-server/src/utils/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,24 @@ use tokio::runtime::Handle;
pub use rayon::{ThreadPool, ThreadPoolBuildError};
use tokio::sync::Semaphore;

/// A thread kind.
///
/// The thread kind has an effect on how threads are prioritized and scheduled.
#[derive(Default, Debug, Clone, Copy)]
pub enum ThreadKind {
/// The default kind, just a thread like any other without any special configuration.
#[default]
Default,
/// A worker thread is a CPU intensive task with a lower priority than the [`Self::Default`] kind.
Worker,
}

/// Used to create a new [`ThreadPool`] thread pool.
pub struct ThreadPoolBuilder {
name: &'static str,
runtime: Option<Handle>,
num_threads: usize,
kind: ThreadKind,
}

impl ThreadPoolBuilder {
Expand All @@ -19,18 +32,25 @@ impl ThreadPoolBuilder {
name,
runtime: None,
num_threads: 0,
kind: ThreadKind::Default,
}
}

/// Sets the number of threads to be used in the rayon threadpool.
/// Sets the number of threads to be used in the rayon thread-pool.
///
/// See also [`rayon::ThreadPoolBuilder::num_threads`].
pub fn num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = num_threads;
self
}

/// Sets the tokio runtime which will be made available in the workers.
/// Configures the [`ThreadKind`] for all threads spawned in the pool.
pub fn thread_kind(mut self, kind: ThreadKind) -> Self {
self.kind = kind;
self
}

/// Sets the Tokio runtime which will be made available in the workers.
pub fn runtime(mut self, runtime: Handle) -> Self {
self.runtime = Some(runtime);
self
Expand All @@ -56,6 +76,7 @@ impl ThreadPoolBuilder {
}
let runtime = self.runtime.clone();
b.spawn(move || {
set_current_thread_priority(self.kind);
let _guard = runtime.as_ref().map(|runtime| runtime.enter());
thread.run()
})?;
Expand All @@ -65,7 +86,7 @@ impl ThreadPoolBuilder {
}
}

/// A [`WorkerGroup`] adds an async backpressure mechanism to a [`ThreadPool`].
/// A [`WorkerGroup`] adds an async back-pressure mechanism to a [`ThreadPool`].
pub struct WorkerGroup {
pool: ThreadPool,
semaphore: Arc<Semaphore>,
Expand Down Expand Up @@ -116,6 +137,34 @@ impl WorkerGroup {
}
}

#[cfg(unix)]
fn set_current_thread_priority(kind: ThreadKind) {
// Lower priorities cause more favorable scheduling.
// Higher priorities cause less favorable scheduling.
//
// For details see `man setpriority(2)`.
let prio = match kind {
// The default priority needs no change, and defaults to `0`.
ThreadKind::Default => return,
// Set a priority of `10` for worker threads,
// it's just important that this is a higher priority than default.
ThreadKind::Worker => 10,
};
if unsafe { libc::setpriority(libc::PRIO_PROCESS, 0, prio) } != 0 {
// Clear the `errno` and log it.
let error = std::io::Error::last_os_error();
relay_log::warn!(
error = &error as &dyn std::error::Error,
"failed to set thread priority for a {kind:?} thread: {error:?}"
);
};
}

#[cfg(not(unix))]
fn set_current_thread_priority(_kind: ThreadKind) {
// Ignored for non-Unix platforms.
}

#[cfg(test)]
mod tests {
use std::sync::Barrier;
Expand Down Expand Up @@ -176,6 +225,34 @@ mod tests {
barrier.wait();
}

#[test]
#[cfg(unix)]
fn test_thread_pool_priority() {
fn get_current_priority() -> i32 {
unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
}

let default_prio = get_current_priority();

{
let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
let prio = pool.install(get_current_priority);
// Default pool priority must match current priority.
assert_eq!(prio, default_prio);
}

{
let pool = ThreadPoolBuilder::new("s")
.num_threads(1)
.thread_kind(ThreadKind::Worker)
.build()
.unwrap();
let prio = pool.install(get_current_priority);
// Worker must be higher than the default priority (higher number = lower priority).
assert!(prio > default_prio);
}
}

#[test]
fn test_worker_group_backpressure() {
let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
Expand Down

0 comments on commit 71ebf8a

Please sign in to comment.