diff --git a/src/rt_tokio/mod.rs b/src/rt_tokio/mod.rs index 6bc06d2..80c5604 100644 --- a/src/rt_tokio/mod.rs +++ b/src/rt_tokio/mod.rs @@ -1,4 +1,5 @@ use std::future::Future; +use std::marker::PhantomData; use std::sync::Arc; use std::{fmt, io}; @@ -8,20 +9,20 @@ pub(crate) mod time; mod local_worker; -pub(crate) use local_worker::LocalHandle; use local_worker::LocalWorker; +/// We test whether thread is supported for current target. +static THREAD_SUPPORTED: Lazy = + Lazy::new(|| std::thread::Builder::new().spawn(|| {}).is_ok()); + pub(crate) fn get_default_runtime_size() -> usize { - // We use num_cpus as std::thread::available_parallelism() does not take - // system resource constraint (e.g.: cgroups) into consideration. - #[cfg(not(target_os = "wasi"))] - { + if *THREAD_SUPPORTED { + // We use num_cpus as std::thread::available_parallelism() does not take + // system resource constraint (e.g.: cgroups) into consideration. num_cpus::get() - } - // WASI does not support multi-threading at this moment. - #[cfg(target_os = "wasi")] - { - 1 + } else { + // For platforms without thread support, we report available workers as 0. + 0 } } @@ -42,18 +43,30 @@ where } #[derive(Clone)] -pub(crate) struct Runtime { - workers: Arc>, +enum RuntimeInner { + /// Target has multi-threading support. + Threaded { workers: Arc> }, + /// Target does not have multi-threading support. + Main, } -impl fmt::Debug for Runtime { +impl fmt::Debug for RuntimeInner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Runtime") - .field("workers", &"Vec") - .finish() + match self { + Self::Threaded { .. } => f + .debug_struct("RuntimeInner::Threaded") + .field("workers", &"Vec") + .finish(), + Self::Main => f.debug_struct("RuntimeInner::Main").finish(), + } } } +#[derive(Clone, Debug)] +pub(crate) struct Runtime { + inner: RuntimeInner, +} + impl Default for Runtime { fn default() -> Self { static DEFAULT_RT: Lazy = Lazy::new(|| { @@ -66,22 +79,30 @@ impl Default for Runtime { impl Runtime { pub fn new(size: usize) -> io::Result { - assert!(size > 0, "must have more than 1 worker."); + if *THREAD_SUPPORTED { + assert!(size > 0, "must have more than 1 worker."); - let mut workers = Vec::with_capacity(size); + let mut workers = Vec::with_capacity(size); - for _ in 0..size { - let worker = LocalWorker::new()?; - workers.push(worker); - } + for _ in 0..size { + let worker = LocalWorker::new()?; + workers.push(worker); + } - Ok(Self { - workers: workers.into(), - }) + Ok(Self { + inner: RuntimeInner::Threaded { + workers: workers.into(), + }, + }) + } else { + Ok(Self { + inner: RuntimeInner::Main, + }) + } } - fn find_least_busy_local_worker(&self) -> &LocalWorker { - let mut workers = self.workers.iter(); + fn find_least_busy_local_worker(workers: &[LocalWorker]) -> &LocalWorker { + let mut workers = workers.iter(); let mut worker = workers.next().expect("must have more than 1 worker."); let mut task_count = worker.task_count(); @@ -109,8 +130,77 @@ impl Runtime { F: Send + 'static, Fut: Future + 'static, { - let worker = self.find_least_busy_local_worker(); - worker.spawn_pinned(create_task); + match self.inner { + RuntimeInner::Threaded { ref workers } => { + let worker = Self::find_least_busy_local_worker(workers); + + worker.spawn_pinned(create_task); + } + + RuntimeInner::Main => { + tokio::task::spawn_local(create_task()); + } + } + } +} + +#[derive(Debug, Clone)] +enum LocalHandleInner { + Threaded(local_worker::LocalHandle), + Main, +} + +#[derive(Debug, Clone)] +pub(crate) struct LocalHandle { + inner: LocalHandleInner, + // This type is not send or sync. + _marker: PhantomData<*const ()>, +} + +impl LocalHandle { + pub fn try_current() -> Option { + if *THREAD_SUPPORTED { + Some(Self { + inner: LocalHandleInner::Threaded(local_worker::LocalHandle::try_current()?), + _marker: PhantomData, + }) + } else { + Some(Self { + inner: LocalHandleInner::Main, + _marker: PhantomData, + }) + } + } + + pub fn current() -> Self { + if *THREAD_SUPPORTED { + Self { + inner: LocalHandleInner::Threaded(local_worker::LocalHandle::current()), + _marker: PhantomData, + } + } else { + Self { + inner: LocalHandleInner::Main, + _marker: PhantomData, + } + } + } + + pub fn spawn_local(&self, f: F) + where + F: Future + 'static, + { + match self.inner { + LocalHandleInner::Threaded(ref m) => { + m.spawn_local(f); + } + LocalHandleInner::Main => { + // For platforms without multi threading support, we assume a behaviour similar to + // wasm-bindgen-futures, where the main function is running under a tokio local set + // and tokio::task::spawn_local is available for the entire program. + tokio::task::spawn_local(f); + } + } } }