Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ Cargo.lock

# These are backup files generated by rustfmt
**/*.rs.bk

.vscode/
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,8 @@ image = { version = "0.24", optional = true, default-features = false}
pcd-rs = { version = "0.10", optional = true, features = ["derive"] }
data-url = {version = "0.2", optional = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.0", features = ["fs", "io-util", "rt-multi-thread", "macros"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
web-sys = { version = "0.3", features = ['Document', 'Window'] }
224 changes: 176 additions & 48 deletions src/io/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,30 @@ use crate::{io::RawAssets, Error, Result};
use std::collections::HashSet;
use std::path::{Path, PathBuf};

/// User Agent string for three-d-asset
#[cfg(feature = "reqwest")]
pub const USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "-", env!("CARGO_PKG_VERSION"));

///
/// Run a future to completion, returning any [`Output`].
///
/// NOTE: This creates a tokio runtime to run the future in, so this should
/// likely be called on some top-level future and not in a loop.
///
/// [`Output`]: std::future::Future::Output
///
#[cfg(not(target_arch = "wasm32"))]
fn block_on<F>(f: F) -> F::Output
where
F: std::future::Future,
{
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(f)
}

///
/// Loads all of the resources in the given paths and returns the [RawAssets] resources.
///
Expand All @@ -17,32 +41,7 @@ use std::path::{Path, PathBuf};
///
#[cfg(not(target_arch = "wasm32"))]
pub fn load(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
let mut raw_assets = load_single(paths)?;
let mut dependencies = super::get_dependencies(&raw_assets);
while !dependencies.is_empty() {
let deps = load_single(&dependencies)?;
dependencies = super::get_dependencies(&deps);
raw_assets.extend(deps);
}
Ok(raw_assets)
}

#[cfg(not(target_arch = "wasm32"))]
fn load_single(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
let mut data_urls = HashSet::new();
let mut local_paths = HashSet::new();
for path in paths.iter() {
let path = path.as_ref().to_path_buf();
if is_data_url(&path) {
data_urls.insert(path);
} else {
local_paths.insert(path);
}
}
let mut raw_assets = RawAssets::new();
load_from_disk(local_paths, &mut raw_assets)?;
parse_data_urls(data_urls, &mut raw_assets)?;
Ok(raw_assets)
block_on(load_async(paths))
}

///
Expand All @@ -64,6 +63,10 @@ pub async fn load_async(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
Ok(raw_assets)
}

///
/// Load paths, but not any of their dependencies (eg. loading an obj will not
/// load it's textures in turn)
///
#[cfg(target_arch = "wasm32")]
async fn load_async_single(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
let base_path = base_path();
Expand All @@ -79,12 +82,15 @@ async fn load_async_single(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
urls.insert(base_path.join(path));
}
}
let mut raw_assets = RawAssets::new();
load_urls(urls, &mut raw_assets).await?;
let mut raw_assets = load_urls(urls).await?;
parse_data_urls(data_urls, &mut raw_assets)?;
Ok(raw_assets)
}

///
/// Load paths, but not any of their dependencies (eg. loading an obj will not
/// load it's textures in turn)
///
#[cfg(not(target_arch = "wasm32"))]
async fn load_async_single(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
let mut urls = HashSet::new();
Expand All @@ -102,35 +108,66 @@ async fn load_async_single(paths: &[impl AsRef<Path>]) -> Result<RawAssets> {
}

let mut raw_assets = RawAssets::new();
load_urls(urls, &mut raw_assets).await?;
load_from_disk(local_paths, &mut raw_assets)?;
// load from network and disk in parallel, returning on the first error
match tokio::try_join!(load_urls(urls), load_from_disk(local_paths)) {
Ok((urls_assets, disk_assets)) => {
raw_assets.extend(urls_assets);
raw_assets.extend(disk_assets);
}
Err(e) => return Err(e),
}
// This function is cpu bound and does not need to be async fn, however it's
// non-trivial if the n of data_urls is large, it may make sense to process
// them in parallel in the future.
parse_data_urls(data_urls, &mut raw_assets)?;
Ok(raw_assets)
}

/// Load assets from disk.
#[cfg(not(target_arch = "wasm32"))]
fn load_from_disk(paths: HashSet<PathBuf>, raw_assets: &mut RawAssets) -> Result<()> {
let mut handles = Vec::new();
async fn load_from_disk<Ps>(paths: Ps) -> Result<RawAssets>
where
Ps: IntoIterator<Item = PathBuf>,
{
let mut raw_assets = RawAssets::new();
let mut tasks = tokio::task::JoinSet::new();

for path in paths {
handles.push((
path.clone(),
std::thread::spawn(move || std::fs::read(path)),
));
// Note: This will spawn all of the tasks at once (which are cheap, only
// 64kb per task), but Tokio will very likely schedule them to run in
// sequence in a dedicated thread. This is a good thing since loading
// many files from *disk* at the same time will likely hurt performance
// due to memory locality issues, especially with spinning disks.
// Letting the runtime decide what to do is probably best here as in
// the future it might use underlying native async io features of the OS
// rather than an IO thread/pool.
tasks.spawn(async move {
let bytes = tokio::fs::read(&path)
.await
.map_err(|e| Error::FailedLoading(path.to_string_lossy().into(), e))?;

Ok((path, bytes))
});
}

for (path, handle) in handles.drain(..) {
let bytes = handle
.join()
.unwrap()
.map_err(|e| Error::FailedLoading(path.to_str().unwrap().to_string(), e))?;
raw_assets.insert(path, bytes);
// Iterate over the `res`ults of the tasks as they complete
while let Some(Ok(res)) = tasks.join_next().await {
// We don't care about Some(Err(e)) as this only happens if the join
// fails which can only happen if a task doesn't complete but that can't
// happpen because the task code in the above for loop can't panic.
match res {
Ok((path, bytes)) => raw_assets.insert(path, bytes),
Err(e) => return Err(e),
};
}
Ok(())

Ok(raw_assets)
}

#[allow(unused_variables)]
async fn load_urls(paths: HashSet<PathBuf>, raw_assets: &mut RawAssets) -> Result<()> {
#[cfg(feature = "reqwest")]
#[cfg(all(target_arch = "wasm32", feature = "reqwest"))]
async fn load_urls(paths: HashSet<PathBuf>) -> Result<RawAssets> {
let mut raw_assets = RawAssets::new();

if paths.len() > 0 {
let mut handles = Vec::new();
let client = reqwest::Client::new();
Expand All @@ -149,13 +186,104 @@ async fn load_urls(paths: HashSet<PathBuf>, raw_assets: &mut RawAssets) -> Resul
raw_assets.insert(path, bytes);
}
}
#[cfg(not(feature = "reqwest"))]

Ok(raw_assets)
}
#[cfg(not(feature = "reqwest"))]
async fn load_urls(paths: HashSet<PathBuf>) -> Result<RawAssets> {
if !paths.is_empty() {
return Err(Error::FeatureMissing("reqwest".to_string()));
}
Ok(())
Ok(RawAssets::new())
}

#[cfg(all(not(target_arch = "wasm32"), feature = "reqwest"))]
async fn load_urls<Us>(urls: Us) -> Result<RawAssets>
where
Us: IntoIterator<Item = PathBuf>,
{
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Semaphore;

// connection limit per host (in the future make this configurable?)
const CONN_PER_HOST: usize = 8;

let mut tasks = tokio::task::JoinSet::new();
// It might be more flexible to provide the client as an argument to this function
let client = reqwest::Client::builder()
.connect_timeout(std::time::Duration::from_secs(5))
.user_agent(USER_AGENT)
.build()
.unwrap();
let it = urls.into_iter();
// allocate enough space for the entire iterator
let mut raw_assets = RawAssets::with_capacity(it.size_hint().1.unwrap_or(0));
// A mapping of hosts to semaphores to limit connections.
let mut host_connections = HashMap::new();

for path in it {
// Note: this is not a deep copy or anything. It's just cloning an Arc.
// The underlying `client` is reused. We must clone it to move it
// (possibly) across threads into the spawned task.
let client = client.clone();

let url = reqwest::Url::parse(match path.to_str() {
Some(valid_unicode) => valid_unicode,
None => return Err(Error::FailedParsingUrl("Bad unicode in url.".into())),
})
.map_err(|e| Error::FailedParsingUrl(e.to_string()))?;

// This could technically fail since some valid urls (like `file::`) do
// not have a valid hostname. It might be best to detect this scheme and
// put their local paths in `local_paths` in `load_async_single`
let host = match url.host() {
Some(host) => host,
None => return Err(Error::FailedParsingUrl("Invalid host.".into())),
};

// Clone our semaphore for this host. We can't acquire here or we await
// here and block iteration, which isn't what we want. We must move this
// inside the closure below and acquire a permit inside the spawned task.
let semaphore = host_connections
.entry(host.to_owned())
.or_insert(Arc::new(Semaphore::new(CONN_PER_HOST)))
.to_owned();

// NOTE: We must not await inside this for loop (outside this task), or
// we block iteration and stop spawning tasks. We want to spawn all
// tasks, and only await within *spawned* tasks. This way all urls are
// submitted as tasks immediately, although downloads will only happen
// if permits are available for a given host.
tasks.spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let response = client
.get(url)
.send()
.await
.map_err(|e| Error::FailedLoadingUrl(path.to_string_lossy().into(), e))?;

let bytes = response
.bytes()
.await
.map_err(|e| Error::FailedLoadingUrl(path.to_string_lossy().into(), e))?
.to_vec();

Ok((path, bytes)) // _permit is released
});
}

// Iterate over the `res`ults of the tasks as they complete
while let Some(Ok(res)) = tasks.join_next().await {
match res {
Ok((path, bytes)) => raw_assets.insert(path, bytes),
Err(e) => return Err(e),
};
}

Ok(raw_assets)
}

/// Decode and add any data urls in `paths` to `raw_assets`
fn parse_data_urls(paths: HashSet<PathBuf>, raw_assets: &mut RawAssets) -> Result<()> {
for path in paths {
let bytes = parse_data_url(path.to_str().unwrap())?;
Expand Down
36 changes: 26 additions & 10 deletions src/io/raw_assets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ impl RawAssets {
Self::default()
}

///
/// Constructs a new empty set of raw assets with at least capacity.
///
pub fn with_capacity(capacity: usize) -> Self {
RawAssets(HashMap::with_capacity(capacity))
}

///
/// Remove and returns the raw byte array for the resource at the given path.
///
Expand Down Expand Up @@ -99,16 +106,6 @@ impl RawAssets {
self
}

///
/// Inserts all of the given raw assets into this set of raw assets.
///
pub fn extend(&mut self, mut raw_assets: Self) -> &mut Self {
for (k, v) in raw_assets.0.drain() {
self.insert(k, v);
}
self
}

///
/// Deserialize the asset with the given path into a type that implements the [Deserialize] trait.
///
Expand Down Expand Up @@ -158,3 +155,22 @@ impl std::fmt::Debug for RawAssets {
d.finish()
}
}

impl Extend<(PathBuf, Vec<u8>)> for RawAssets {
///
/// Inserts all of the given raw assets into this set of raw assets.
///
fn extend<I: IntoIterator<Item = (PathBuf, Vec<u8>)>>(&mut self, iter: I) {
self.0.extend(iter)
}
}

impl IntoIterator for RawAssets {
type Item = (PathBuf, Vec<u8>);

type IntoIter = <HashMap<PathBuf, Vec<u8>> as IntoIterator>::IntoIter;

fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}