From f11725b4516bf97fb225272694bb33cefccba6a8 Mon Sep 17 00:00:00 2001 From: Michael de Gans Date: Tue, 28 Feb 2023 17:30:00 -0800 Subject: [PATCH 1/2] Async loader fixes * fix `fn load_from_disk` -> `async fn load_from_disk` Old code was incompatible with async and because this code here... ```rust for (path, handle) in handles.drain(..) { let bytes = handle .join() ``` ... blocks the calling thread until all threads are joined (because join is a blocking call), which in the case of a single-threaded runtime would have the effect of preventing any other tasks/`async fn` from executing. In a multi-threaded runtime, it would just block all tasks on that particular thread, which could be a lot. This might exhibit as lag in other tasks while loading is in progress. New code for non-wasm uses tokio tasks (green threads) and lets the runtime decide how to schedule file io by `await`ing instead of `join`ing. This allows the runtime to schedule other tasks for execution while awaiting the tasks. Note that although the tokio runtime will likely decide to load from disk from a single worker thread in series. The difference here is that the runtime knows it's going on and will do all that in a dedicated thread, which won't block the calling thread. * new non-wasm download code The old code used async and await, and the code was async, but the downloads would still happen sequentially because when you await in a loop, it stops iteration *within that function*. In the following code, each `send()` would therefore be made in sequence. Other async functions could be interleaved and run, but based on the name `handles` I think the intent here was to await all paths concurrently. ```rust for path in paths { let url = reqwest::Url::parse(path.to_str().unwrap()) .map_err(|_| Error::FailedParsingUrl(path.to_str().unwrap().to_string()))?; handles.push((path, client.get(url).send().await)); ``` The new code uses tokio tasks, which are all awaited concurrently -- the only limit being 8 connections per host, to avoid hammering servers. * simplify `load_single` `load_single` now just calls `load_async_single` from a new executor on the current thread. Async and sync code is shared this way and performance will likely benefit provided it's not called in a tight loop. Tasks within the future may still be awaited concurrently provided they don't block. * ignore `.vscode` * enable parallel network and disk loading Old behavior was to await network assets, then await disk assets. New behavior is to await both disk and network IO concurrently. https://rust-lang.github.io/async-book/06_multiple_futures/02_join.html * simplify `load` code * document `load_async_single` * add `RawAssets::with_capacity` - to allow pre-allocating space when the len is known ahead of time --- .gitignore | 2 + Cargo.toml | 3 + src/io/loader.rs | 223 ++++++++++++++++++++++++++++++++++--------- src/io/raw_assets.rs | 36 +++++-- 4 files changed, 207 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 627977b..d0a7d57 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk + +.vscode/ \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 8a134f2..177d1fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,5 +48,8 @@ image = { version = "0.24", optional = true, default-features = false} pcd-rs = { version = "0.10", optional = true, features = ["derive"] } data-url = {version = "0.2", optional = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { version = "1.0", features = ["fs", "io-util", "rt-multi-thread", "macros"] } + [target.'cfg(target_arch = "wasm32")'.dependencies] web-sys = { version = "0.3", features = ['Document', 'Window'] } \ No newline at end of file diff --git a/src/io/loader.rs b/src/io/loader.rs index a08c021..2665c4f 100644 --- a/src/io/loader.rs +++ b/src/io/loader.rs @@ -6,6 +6,30 @@ use crate::{io::RawAssets, Error, Result}; use std::collections::HashSet; use std::path::{Path, PathBuf}; +/// User Agent string for three-d-asset +#[cfg(feature = "reqwest")] +pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "-", env!("CARGO_PKG_VERSION")); + +/// +/// Run a future to completion, returning any [`Output`]. +/// +/// NOTE: This creates a tokio runtime to run the future in, so this should +/// likely be called on some top-level future and not in a loop. +/// +/// [`Output`]: std::future::Future::Output +/// +#[cfg(not(target_arch = "wasm32"))] +fn block_on(f: F) -> F::Output +where + F: std::future::Future, +{ + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(f) +} + /// /// Loads all of the resources in the given paths and returns the [RawAssets] resources. /// @@ -17,32 +41,7 @@ use std::path::{Path, PathBuf}; /// #[cfg(not(target_arch = "wasm32"))] pub fn load(paths: &[impl AsRef]) -> Result { - let mut raw_assets = load_single(paths)?; - let mut dependencies = super::get_dependencies(&raw_assets); - while !dependencies.is_empty() { - let deps = load_single(&dependencies)?; - dependencies = super::get_dependencies(&deps); - raw_assets.extend(deps); - } - Ok(raw_assets) -} - -#[cfg(not(target_arch = "wasm32"))] -fn load_single(paths: &[impl AsRef]) -> Result { - let mut data_urls = HashSet::new(); - let mut local_paths = HashSet::new(); - for path in paths.iter() { - let path = path.as_ref().to_path_buf(); - if is_data_url(&path) { - data_urls.insert(path); - } else { - local_paths.insert(path); - } - } - let mut raw_assets = RawAssets::new(); - load_from_disk(local_paths, &mut raw_assets)?; - parse_data_urls(data_urls, &mut raw_assets)?; - Ok(raw_assets) + block_on(load_async(paths)) } /// @@ -64,6 +63,10 @@ pub async fn load_async(paths: &[impl AsRef]) -> Result { Ok(raw_assets) } +/// +/// Load paths, but not any of their dependencies (eg. loading an obj will not +/// load it's textures in turn) +/// #[cfg(target_arch = "wasm32")] async fn load_async_single(paths: &[impl AsRef]) -> Result { let base_path = base_path(); @@ -79,12 +82,15 @@ async fn load_async_single(paths: &[impl AsRef]) -> Result { urls.insert(base_path.join(path)); } } - let mut raw_assets = RawAssets::new(); - load_urls(urls, &mut raw_assets).await?; + let mut raw_assets = load_urls(urls).await?; parse_data_urls(data_urls, &mut raw_assets)?; Ok(raw_assets) } +/// +/// Load paths, but not any of their dependencies (eg. loading an obj will not +/// load it's textures in turn) +/// #[cfg(not(target_arch = "wasm32"))] async fn load_async_single(paths: &[impl AsRef]) -> Result { let mut urls = HashSet::new(); @@ -102,35 +108,66 @@ async fn load_async_single(paths: &[impl AsRef]) -> Result { } let mut raw_assets = RawAssets::new(); - load_urls(urls, &mut raw_assets).await?; - load_from_disk(local_paths, &mut raw_assets)?; + // load from network and disk in parallel, returning on the first error + match tokio::try_join!(load_urls(urls), load_from_disk(local_paths)) { + Ok((urls_assets, disk_assets)) => { + raw_assets.extend(urls_assets); + raw_assets.extend(disk_assets); + } + Err(e) => return Err(e), + } + // This function is cpu bound and does not need to be async fn, however it's + // non-trivial if the n of data_urls is large, it may make sense to process + // them in parallel in the future. parse_data_urls(data_urls, &mut raw_assets)?; Ok(raw_assets) } +/// Load assets from disk. #[cfg(not(target_arch = "wasm32"))] -fn load_from_disk(paths: HashSet, raw_assets: &mut RawAssets) -> Result<()> { - let mut handles = Vec::new(); +async fn load_from_disk(paths: Ps) -> Result +where + Ps: IntoIterator, +{ + let mut raw_assets = RawAssets::new(); + let mut tasks = tokio::task::JoinSet::new(); + for path in paths { - handles.push(( - path.clone(), - std::thread::spawn(move || std::fs::read(path)), - )); + // Note: This will spawn all of the tasks at once (which are cheap, only + // 64kb per task), but Tokio will very likely schedule them to run in + // sequence in a dedicated thread. This is a good thing since loading + // many files from *disk* at the same time will likely hurt performance + // due to memory locality issues, especially with spinning disks. + // Letting the runtime decide what to do is probably best here as in + // the future it might use underlying native async io features of the OS + // rather than an IO thread/pool. + tasks.spawn(async move { + let bytes = tokio::fs::read(&path) + .await + .map_err(|e| Error::FailedLoading(path.to_string_lossy().into(), e))?; + + Ok((path, bytes)) + }); } - for (path, handle) in handles.drain(..) { - let bytes = handle - .join() - .unwrap() - .map_err(|e| Error::FailedLoading(path.to_str().unwrap().to_string(), e))?; - raw_assets.insert(path, bytes); + // Iterate over the `res`ults of the tasks as they complete + while let Some(Ok(res)) = tasks.join_next().await { + // We don't care about Some(Err(e)) as this only happens if the join + // fails which can only happen if a task doesn't complete but that can't + // happpen because the task code in the above for loop can't panic. + match res { + Ok((path, bytes)) => raw_assets.insert(path, bytes), + Err(e) => return Err(e), + }; } - Ok(()) + + Ok(raw_assets) } -#[allow(unused_variables)] -async fn load_urls(paths: HashSet, raw_assets: &mut RawAssets) -> Result<()> { - #[cfg(feature = "reqwest")] +#[cfg(all(target_arch = "wasm32", feature = "reqwest"))] +async fn load_urls(paths: HashSet) -> Result { + let mut raw_assets = RawAssets::new(); + if paths.len() > 0 { let mut handles = Vec::new(); let client = reqwest::Client::new(); @@ -149,13 +186,105 @@ async fn load_urls(paths: HashSet, raw_assets: &mut RawAssets) -> Resul raw_assets.insert(path, bytes); } } + + Ok(raw_assets) +} +#[cfg(not(feature = "reqwest"))] +async fn load_urls(paths: HashSet) -> Result { #[cfg(not(feature = "reqwest"))] if !paths.is_empty() { return Err(Error::FeatureMissing("reqwest".to_string())); } - Ok(()) + Ok(RawAssets::new()) +} + +#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))] +async fn load_urls(urls: Us) -> Result +where + Us: IntoIterator, +{ + use std::{collections::HashMap, sync::Arc}; + use tokio::sync::Semaphore; + + // connection limit per host (in the future make this configurable?) + const CONN_PER_HOST: usize = 8; + + let mut tasks = tokio::task::JoinSet::new(); + // It might be more flexible to provide the client as an argument to this function + let client = reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(5)) + .user_agent(USER_AGENT) + .build() + .unwrap(); + let it = urls.into_iter(); + // allocate enough space for the entire iterator + let mut raw_assets = RawAssets::with_capacity(it.size_hint().1.unwrap_or(0)); + // A mapping of hosts to semaphores to limit connections. + let mut host_connections = HashMap::new(); + + for path in it { + // Note: this is not a deep copy or anything. It's just cloning an Arc. + // The underlying `client` is reused. We must clone it to move it + // (possibly) across threads into the spawned task. + let client = client.clone(); + + let url = reqwest::Url::parse(match path.to_str() { + Some(valid_unicode) => valid_unicode, + None => return Err(Error::FailedParsingUrl("Bad unicode in url.".into())), + }) + .map_err(|e| Error::FailedParsingUrl(e.to_string()))?; + + // This could technically fail since some valid urls (like `file::`) do + // not have a valid hostname. It might be best to detect this scheme and + // put their local paths in `local_paths` in `load_async_single` + let host = match url.host() { + Some(host) => host, + None => return Err(Error::FailedParsingUrl("Invalid host.".into())), + }; + + // Clone our semaphore for this host. We can't acquire here or we await + // here and block iteration, which isn't what we want. We must move this + // inside the closure below and acquire a permit inside the spawned task. + let semaphore = host_connections + .entry(host.to_owned()) + .or_insert(Arc::new(Semaphore::new(CONN_PER_HOST))) + .to_owned(); + + // NOTE: We must not await inside this for loop (outside this task), or + // we block iteration and stop spawning tasks. We want to spawn all + // tasks, and only await within *spawned* tasks. This way all urls are + // submitted as tasks immediately, although downloads will only happen + // if permits are available for a given host. + tasks.spawn(async move { + let _permit = semaphore.acquire().await.unwrap(); + let response = client + .get(url) + .send() + .await + .map_err(|e| Error::FailedLoadingUrl(path.to_string_lossy().into(), e))?; + + let bytes = response + .bytes() + .await + .map_err(|e| Error::FailedLoadingUrl(path.to_string_lossy().into(), e))? + .to_vec(); + + Ok((path, bytes)) // _permit is released + }); + } + + // Iterate over the `res`ults of the tasks as they complete + while let Some(Ok(res)) = tasks.join_next().await { + match res { + Ok((path, bytes)) => raw_assets.insert(path, bytes), + Err(e) => return Err(e), + }; + } + + Ok(raw_assets) } +/// Decode and add any data urls in `paths` to `raw_assets` fn parse_data_urls(paths: HashSet, raw_assets: &mut RawAssets) -> Result<()> { for path in paths { let bytes = parse_data_url(path.to_str().unwrap())?; diff --git a/src/io/raw_assets.rs b/src/io/raw_assets.rs index c301217..b095c0d 100644 --- a/src/io/raw_assets.rs +++ b/src/io/raw_assets.rs @@ -20,6 +20,13 @@ impl RawAssets { Self::default() } + /// + /// Constructs a new empty set of raw assets with at least capacity. + /// + pub fn with_capacity(capacity: usize) -> Self { + RawAssets(HashMap::with_capacity(capacity)) + } + /// /// Remove and returns the raw byte array for the resource at the given path. /// @@ -99,16 +106,6 @@ impl RawAssets { self } - /// - /// Inserts all of the given raw assets into this set of raw assets. - /// - pub fn extend(&mut self, mut raw_assets: Self) -> &mut Self { - for (k, v) in raw_assets.0.drain() { - self.insert(k, v); - } - self - } - /// /// Deserialize the asset with the given path into a type that implements the [Deserialize] trait. /// @@ -158,3 +155,22 @@ impl std::fmt::Debug for RawAssets { d.finish() } } + +impl Extend<(PathBuf, Vec)> for RawAssets { + /// + /// Inserts all of the given raw assets into this set of raw assets. + /// + fn extend)>>(&mut self, iter: I) { + self.0.extend(iter) + } +} + +impl IntoIterator for RawAssets { + type Item = (PathBuf, Vec); + + type IntoIter = > as IntoIterator>::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} From f08b1c9fce3d16576828e10fa71fc793f1c7ccab Mon Sep 17 00:00:00 2001 From: Asger Nyman Christiansen Date: Wed, 1 Mar 2023 15:07:51 +0100 Subject: [PATCH 2/2] Minor clean-up --- src/io/loader.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/io/loader.rs b/src/io/loader.rs index 2665c4f..808774c 100644 --- a/src/io/loader.rs +++ b/src/io/loader.rs @@ -191,7 +191,6 @@ async fn load_urls(paths: HashSet) -> Result { } #[cfg(not(feature = "reqwest"))] async fn load_urls(paths: HashSet) -> Result { - #[cfg(not(feature = "reqwest"))] if !paths.is_empty() { return Err(Error::FeatureMissing("reqwest".to_string())); }