Skip to content

Commit 8e441c3

Browse files
authored
Refactor system diagnostics to use a single task (#20852)
# Objective Fixes #20802 ## Solution System information diagnostics now uses a single task that wakes up every time the `First` schedule runs. The task checks if enough time has passed since the last refresh. If enough time has passed, it refreshes the system information and sends it to a channel. The `read_diagonstic_task` system then reads the system information from the channel to add diagnostic data. ## Testing - Did you test these changes? If so, how? I used the log diagnostics example and compare before the changes and after the changes to ensure it works similarly or better than before. - How can other people (reviewers) test your changes? Is there anything specific they need to know? ``` cargo run --example log_diagnostics ``` - If relevant, what platforms did you test these changes on, and are there any important ones you can't test? Linux
1 parent 9ea6b13 commit 8e441c3

File tree

3 files changed

+124
-87
lines changed

3 files changed

+124
-87
lines changed

crates/bevy_diagnostic/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ bevy_platform = { path = "../bevy_platform", version = "0.17.0-dev", default-fea
6060
] }
6161

6262
# other
63+
atomic-waker = { version = "1", default-features = false }
6364
const-fnv1a-hash = "1.1.0"
6465
serde = { version = "1.0", default-features = false, features = [
6566
"alloc",

crates/bevy_diagnostic/src/diagnostic.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@ impl DiagnosticPath {
3939
pub fn new(path: impl Into<Cow<'static, str>>) -> DiagnosticPath {
4040
let path = path.into();
4141

42-
debug_assert!(!path.is_empty(), "diagnostic path can't be empty");
42+
debug_assert!(!path.is_empty(), "diagnostic path should not be empty");
4343
debug_assert!(
4444
!path.starts_with('/'),
45-
"diagnostic path can't be start with `/`"
45+
"diagnostic path should not start with `/`"
4646
);
4747
debug_assert!(
4848
!path.ends_with('/'),
49-
"diagnostic path can't be end with `/`"
49+
"diagnostic path should not end with `/`"
5050
);
5151
debug_assert!(
5252
!path.contains("//"),
53-
"diagnostic path can't contain empty components"
53+
"diagnostic path should not contain empty components"
5454
);
5555

5656
DiagnosticPath {

crates/bevy_diagnostic/src/system_information_diagnostics_plugin.rs

Lines changed: 119 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ use bevy_ecs::resource::Resource;
99
/// Any system diagnostics gathered by this plugin may not be current when you access them.
1010
///
1111
/// Supported targets:
12-
/// * linux,
13-
/// * windows,
14-
/// * android,
12+
/// * linux
13+
/// * windows
14+
/// * android
1515
/// * macOS
1616
///
17-
/// NOT supported when using the `bevy/dynamic` feature even when using previously mentioned targets
17+
/// NOT supported when using the `bevy/dynamic` feature even when using previously mentioned targets.
1818
///
1919
/// # See also
2020
///
@@ -69,20 +69,27 @@ pub struct SystemInfo {
6969
not(feature = "dynamic_linking"),
7070
feature = "std",
7171
))]
72-
pub mod internal {
72+
mod internal {
73+
use core::{
74+
pin::Pin,
75+
task::{Context, Poll},
76+
};
77+
use std::sync::{
78+
mpsc::{self, Receiver, Sender},
79+
Arc,
80+
};
81+
7382
use alloc::{
7483
format,
7584
string::{String, ToString},
76-
sync::Arc,
77-
vec::Vec,
7885
};
86+
use atomic_waker::AtomicWaker;
7987
use bevy_app::{App, First, Startup, Update};
8088
use bevy_ecs::resource::Resource;
81-
use bevy_ecs::{prelude::ResMut, system::Local};
82-
use bevy_platform::time::Instant;
83-
use bevy_tasks::{available_parallelism, block_on, poll_once, AsyncComputeTaskPool, Task};
89+
use bevy_ecs::{prelude::ResMut, system::Commands};
90+
use bevy_platform::{cell::SyncCell, time::Instant};
91+
use bevy_tasks::{AsyncComputeTaskPool, Task};
8492
use log::info;
85-
use std::sync::Mutex;
8693
use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System};
8794

8895
use crate::{Diagnostic, Diagnostics, DiagnosticsStore};
@@ -91,14 +98,30 @@ pub mod internal {
9198

9299
const BYTES_TO_GIB: f64 = 1.0 / 1024.0 / 1024.0 / 1024.0;
93100

101+
/// Sets up the system information diagnostics plugin.
102+
///
103+
/// The plugin spawns a single background task in the async task pool that always reschedules.
104+
/// The [`wake_diagnostic_task`] system wakes this task once per frame during the [`First`]
105+
/// schedule. If enough time has passed since the last refresh, it sends [`SysinfoRefreshData`]
106+
/// through a channel. The [`read_diagnostic_task`] system receives this data during the
107+
/// [`Update`] schedule and adds it as diagnostic measurements.
94108
pub(super) fn setup_plugin(app: &mut App) {
95109
app.add_systems(Startup, setup_system)
96-
.add_systems(First, launch_diagnostic_tasks)
97-
.add_systems(Update, read_diagnostic_tasks)
98-
.init_resource::<SysinfoTasks>();
110+
.add_systems(First, wake_diagnostic_task)
111+
.add_systems(Update, read_diagnostic_task);
99112
}
100113

101-
fn setup_system(mut diagnostics: ResMut<DiagnosticsStore>) {
114+
fn setup_system(mut diagnostics: ResMut<DiagnosticsStore>, mut commands: Commands) {
115+
let (tx, rx) = mpsc::channel();
116+
let diagnostic_task = DiagnosticTask::new(tx);
117+
let waker = Arc::clone(&diagnostic_task.waker);
118+
let task = AsyncComputeTaskPool::get().spawn(diagnostic_task);
119+
commands.insert_resource(SysinfoTask {
120+
_task: task,
121+
receiver: SyncCell::new(rx),
122+
waker,
123+
});
124+
102125
diagnostics.add(
103126
Diagnostic::new(SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE).with_suffix("%"),
104127
);
@@ -121,78 +144,92 @@ pub mod internal {
121144
process_mem_usage: f64,
122145
}
123146

124-
#[derive(Resource, Default)]
125-
struct SysinfoTasks {
126-
tasks: Vec<Task<SysinfoRefreshData>>,
147+
impl SysinfoRefreshData {
148+
fn new(system: &mut System) -> Self {
149+
let pid = sysinfo::get_current_pid().expect("Failed to get current process ID");
150+
system.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
151+
152+
system.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
153+
system.refresh_memory();
154+
155+
let system_cpu_usage = system.global_cpu_usage().into();
156+
let total_mem = system.total_memory() as f64;
157+
let used_mem = system.used_memory() as f64;
158+
let system_mem_usage = used_mem / total_mem * 100.0;
159+
160+
let process_mem_usage = system
161+
.process(pid)
162+
.map(|p| p.memory() as f64 * BYTES_TO_GIB)
163+
.unwrap_or(0.0);
164+
165+
let process_cpu_usage = system
166+
.process(pid)
167+
.map(|p| p.cpu_usage() as f64 / system.cpus().len() as f64)
168+
.unwrap_or(0.0);
169+
170+
Self {
171+
system_cpu_usage,
172+
system_mem_usage,
173+
process_cpu_usage,
174+
process_mem_usage,
175+
}
176+
}
127177
}
128178

129-
fn launch_diagnostic_tasks(
130-
mut tasks: ResMut<SysinfoTasks>,
131-
// TODO: Consider a fair mutex
132-
mut sysinfo: Local<Option<Arc<Mutex<System>>>>,
133-
// TODO: FromWorld for Instant?
134-
mut last_refresh: Local<Option<Instant>>,
135-
) {
136-
let sysinfo = sysinfo.get_or_insert_with(|| {
137-
Arc::new(Mutex::new(System::new_with_specifics(
138-
RefreshKind::nothing()
139-
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
140-
.with_memory(MemoryRefreshKind::everything()),
141-
)))
142-
});
179+
#[derive(Resource)]
180+
struct SysinfoTask {
181+
_task: Task<()>,
182+
receiver: SyncCell<Receiver<SysinfoRefreshData>>,
183+
waker: Arc<AtomicWaker>,
184+
}
143185

144-
let last_refresh = last_refresh.get_or_insert_with(Instant::now);
145-
146-
let thread_pool = AsyncComputeTaskPool::get();
147-
148-
// Only queue a new system refresh task when necessary
149-
// Queuing earlier than that will not give new data
150-
if last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL
151-
// These tasks don't yield and will take up all of the task pool's
152-
// threads if we don't limit their amount.
153-
&& tasks.tasks.len() * 2 < available_parallelism()
154-
{
155-
let sys = Arc::clone(sysinfo);
156-
let task = thread_pool.spawn(async move {
157-
let mut sys = sys.lock().unwrap();
158-
let pid = sysinfo::get_current_pid().expect("Failed to get current process ID");
159-
sys.refresh_processes(sysinfo::ProcessesToUpdate::Some(&[pid]), true);
160-
161-
sys.refresh_cpu_specifics(CpuRefreshKind::nothing().with_cpu_usage());
162-
sys.refresh_memory();
163-
let system_cpu_usage = sys.global_cpu_usage().into();
164-
let total_mem = sys.total_memory() as f64;
165-
let used_mem = sys.used_memory() as f64;
166-
let system_mem_usage = used_mem / total_mem * 100.0;
167-
168-
let process_mem_usage = sys
169-
.process(pid)
170-
.map(|p| p.memory() as f64 * BYTES_TO_GIB)
171-
.unwrap_or(0.0);
172-
173-
let process_cpu_usage = sys
174-
.process(pid)
175-
.map(|p| p.cpu_usage() as f64 / sys.cpus().len() as f64)
176-
.unwrap_or(0.0);
177-
178-
SysinfoRefreshData {
179-
system_cpu_usage,
180-
system_mem_usage,
181-
process_cpu_usage,
182-
process_mem_usage,
183-
}
184-
});
185-
tasks.tasks.push(task);
186-
*last_refresh = Instant::now();
186+
struct DiagnosticTask {
187+
system: System,
188+
last_refresh: Instant,
189+
sender: Sender<SysinfoRefreshData>,
190+
waker: Arc<AtomicWaker>,
191+
}
192+
193+
impl DiagnosticTask {
194+
fn new(sender: Sender<SysinfoRefreshData>) -> Self {
195+
Self {
196+
system: System::new_with_specifics(
197+
RefreshKind::nothing()
198+
.with_cpu(CpuRefreshKind::nothing().with_cpu_usage())
199+
.with_memory(MemoryRefreshKind::everything()),
200+
),
201+
// Avoids initial delay on first refresh
202+
last_refresh: Instant::now() - sysinfo::MINIMUM_CPU_UPDATE_INTERVAL,
203+
sender,
204+
waker: Arc::default(),
205+
}
187206
}
188207
}
189208

190-
fn read_diagnostic_tasks(mut diagnostics: Diagnostics, mut tasks: ResMut<SysinfoTasks>) {
191-
tasks.tasks.retain_mut(|task| {
192-
let Some(data) = block_on(poll_once(task)) else {
193-
return true;
194-
};
209+
impl Future for DiagnosticTask {
210+
type Output = ();
211+
212+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
213+
self.waker.register(cx.waker());
214+
215+
if self.last_refresh.elapsed() > sysinfo::MINIMUM_CPU_UPDATE_INTERVAL {
216+
self.last_refresh = Instant::now();
217+
218+
let sysinfo_refresh_data = SysinfoRefreshData::new(&mut self.system);
219+
self.sender.send(sysinfo_refresh_data).unwrap();
220+
}
221+
222+
// Always reschedules
223+
Poll::Pending
224+
}
225+
}
195226

227+
fn wake_diagnostic_task(task: ResMut<SysinfoTask>) {
228+
task.waker.wake();
229+
}
230+
231+
fn read_diagnostic_task(mut diagnostics: Diagnostics, mut task: ResMut<SysinfoTask>) {
232+
while let Ok(data) = task.receiver.get().try_recv() {
196233
diagnostics.add_measurement(
197234
&SystemInformationDiagnosticsPlugin::SYSTEM_CPU_USAGE,
198235
|| data.system_cpu_usage,
@@ -209,8 +246,7 @@ pub mod internal {
209246
&SystemInformationDiagnosticsPlugin::PROCESS_MEM_USAGE,
210247
|| data.process_mem_usage,
211248
);
212-
false
213-
});
249+
}
214250
}
215251

216252
impl Default for SystemInfo {
@@ -252,7 +288,7 @@ pub mod internal {
252288
not(feature = "dynamic_linking"),
253289
feature = "std",
254290
)))]
255-
pub mod internal {
291+
mod internal {
256292
use alloc::string::ToString;
257293
use bevy_app::{App, Startup};
258294

0 commit comments

Comments
 (0)