Skip to content

Commit

Permalink
Support fast path reads for mmapped/borrowed data
Browse files Browse the repository at this point in the history
Often times an AssetLoader receives a AsyncRead implementation that is
backed by a memory map or a byte slice. In cases where we're dealing
with a format that has a zero-copy parser this introduces an unnecessary
copy. We avoid this by special casing the "vfs" and "dvdbnd" asset
sources to produce a special `FastPathAssetReader`.
  • Loading branch information
garyttierney committed Mar 23, 2024
1 parent a2d8de6 commit b897920
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 89 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/asset-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ license.workspace = true
edition.workspace = true

[dependencies]
as-any = "0.3.1"
bevy = "0.13"
blocking = "1"
byteorder = "1.5.0"
crossbeam-channel = "0.5"
fstools_dvdbnd.workspace = true
Expand Down
43 changes: 21 additions & 22 deletions crates/asset-server/src/asset_source.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,35 @@
use std::{io, io::Read, path::PathBuf, pin::Pin, sync::Arc, task::Poll};
use std::{
error::Error,
future::Future,
io,
io::Read
,
path::PathBuf
,
sync::Arc
,
};

use bevy::{
app::{App, Plugin},
asset::io::{AssetSource, AssetSourceId},
prelude::{AssetApp, Deref, DerefMut},
asset::{
Asset,
AssetLoader,
io::{AssetSource, AssetSourceId}, meta::Settings,
},
prelude::AssetApp,
reflect::erased_serde::__private::serde::{Deserialize, Serialize},
};
use fstools_dvdbnd::{ArchiveKeyProvider, DvdBnd};
use futures_lite::AsyncRead;

use futures_lite::{AsyncRead, AsyncReadExt};
use crate::asset_source::{
dvdbnd::DvdBndAssetSource,
vfs::{watcher::VfsWatcher, Vfs, VfsAssetSource},
vfs::{Vfs, VfsAssetSource, watcher::VfsWatcher},
};

pub mod dvdbnd;
pub mod vfs;
pub(crate) mod fast_path;

pub struct FsAssetSourcePlugin {
dvd_bnd: Arc<DvdBnd>,
Expand Down Expand Up @@ -57,19 +72,3 @@ impl Plugin for FsAssetSourcePlugin {
);
}
}

#[derive(Deref, DerefMut)]
struct SimpleReader<R: Read>(R);

impl<R: Read> Unpin for SimpleReader<R> {}

impl<R: Read> AsyncRead for SimpleReader<R> {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let reader = self.get_mut();
Poll::Ready(reader.read(buf))
}
}
40 changes: 17 additions & 23 deletions crates/asset-server/src/asset_source/dvdbnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use bevy::asset::{
io::{AssetReader, AssetReaderError, PathStream, Reader},
BoxedFuture,
};
use bevy::prelude::Deref;
use blocking::Unblock;
use fstools_dvdbnd::{DvdBnd, DvdBndEntryError};
use fstools_formats::dcx::DcxHeader;

use crate::asset_source::SimpleReader;
use crate::asset_source::fast_path::FastPathReader;

#[derive(Clone)]
#[derive(Clone, Deref)]
pub struct DvdBndAssetSource(pub(crate) Arc<DvdBnd>);

impl AssetReader for DvdBndAssetSource {
Expand All @@ -19,31 +21,23 @@ impl AssetReader for DvdBndAssetSource {
) -> BoxedFuture<'a, Result<Box<Reader<'a>>, AssetReaderError>> {
Box::pin(async move {
let path_str = path.to_string_lossy();
let dvd_bnd = &self.0;
let file = self.open(&*path_str).map_err(|err| match err {
DvdBndEntryError::NotFound => AssetReaderError::NotFound(path.to_path_buf()),
err => AssetReaderError::Io(Arc::new(io::Error::other(err))),
})?;

dvd_bnd
.open(&*path_str)
.map_err(|err| match err {
DvdBndEntryError::NotFound => AssetReaderError::NotFound(path.to_path_buf()),
err => AssetReaderError::Io(Arc::new(io::Error::other(err))),
})
.and_then(|r| {
let is_dcx = {
let bytes = r.data();
&bytes[..4] == b"DCX\0"
};
let is_dcx = { file.data().starts_with(b"DCX\0") };

let reader = if is_dcx {
let (_dcx_header, dcx_reader) = DcxHeader::read(r)
.map_err(|err| AssetReaderError::Io(Arc::new(io::Error::other(err))))?;
let reader = if is_dcx {
let (_dcx_header, dcx_reader) = DcxHeader::read(file)
.map_err(|err| AssetReaderError::Io(Arc::new(io::Error::other(err))))?;

Box::new(SimpleReader(dcx_reader)) as Box<Reader>
} else {
Box::new(SimpleReader(r)) as Box<Reader>
};
FastPathReader::Reader(Box::new(Unblock::new(dcx_reader)))
} else {
FastPathReader::MemoryMapped(file.into(), 0)
};

Ok(reader)
})
Ok(Box::new(reader) as Box<Reader>)
})
}

Expand Down
132 changes: 132 additions & 0 deletions crates/asset-server/src/asset_source/fast_path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{
error::Error,
future::Future,
io,
io::Read,
marker::PhantomData,
pin::{pin, Pin},
task::Poll,
};

use bevy::{
asset::{
io::{AssetSourceId, Reader},
meta::Settings,
Asset, AssetLoader, BoxedFuture, LoadContext,
},
reflect::erased_serde::__private::serde::{Deserialize, Serialize},
};
use futures_lite::{AsyncRead, AsyncReadExt};
use memmap2::Mmap;

pub trait FastPathAsset: Asset + Sized {
/// The settings type used by this [`AssetLoader`].
type Settings: Settings + Default + Serialize + for<'a> Deserialize<'a>;

/// The type of [error](`std::error::Error`) which could be encountered by this loader.
type Error: Into<Box<dyn Error + Send + Sync + 'static>> + From<io::Error>;

fn load_from_bytes<'a>(
reader: &'a [u8],
settings: &'a Self::Settings,
load_context: &'a mut LoadContext,
) -> impl Future<Output = Result<Self, Self::Error>> + Send;
}

pub struct FastPathAssetLoader<T: FastPathAsset> {
_phantom: PhantomData<T>,
extensions: &'static [&'static str],
}

impl<T: FastPathAsset> FastPathAssetLoader<T> {
pub fn new(extensions: &'static [&'static str]) -> Self {
Self {
_phantom: PhantomData,
extensions,
}
}
}

impl<T: FastPathAsset> AssetLoader for FastPathAssetLoader<T> {
type Asset = T;
type Settings = T::Settings;
type Error = T::Error;

fn load<'a>(
&'a self,
reader: &'a mut Reader,
settings: &'a Self::Settings,
load_context: &'a mut LoadContext,
) -> BoxedFuture<'a, Result<Self::Asset, Self::Error>> {
Box::pin(async move {
let dvdbnd_asset_source_id: AssetSourceId = AssetSourceId::from("dvdbnd");
let vfs_asset_source_id: AssetSourceId = AssetSourceId::from("vfs");

let source = load_context.asset_path().source();
let data = if source == &dvdbnd_asset_source_id || source == &vfs_asset_source_id {
// SAFETY: This invariant is upheld by the `dvdbnd` and `vfs` asset source
// implementations. They MUST return an implementation of FastPathReader.
let reader = unsafe { (reader as *mut Reader).cast::<FastPathReader>().as_mut() };
reader.and_then(|r| r.as_bytes())
} else {
None
};

match data {
None => {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).await?;

T::load_from_bytes(&buffer, settings, load_context).await
}
Some(slice) => T::load_from_bytes(slice, settings, load_context).await,
}
})
}

fn extensions(&self) -> &[&str] {
self.extensions
}
}

/// An [`AsyncRead`] implementation that allows consuming Bevy asset loaders to bypass the read
/// implementation and directly access the data when available.
pub enum FastPathReader<'a> {
MemoryMapped(Mmap, usize),
Reader(Box<dyn AsyncRead + Unpin + Send + Sync + 'a>),
Slice(&'a [u8]),
}

impl<'a> FastPathReader<'a> {
pub fn as_bytes(&'a self) -> Option<&'a [u8]> {
match self {
FastPathReader::Slice(slice) => Some(slice),
FastPathReader::MemoryMapped(mmap, _) => Some(&mmap[..]),
FastPathReader::Reader(_) => None,
}
}
}

impl<'a> AsyncRead for FastPathReader<'a> {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.get_mut() {
FastPathReader::Reader(reader) => AsyncRead::poll_read(pin!(reader), _cx, buf),
FastPathReader::Slice(slice) => Poll::Ready(Read::read(slice, buf)),
FastPathReader::MemoryMapped(dvd_bnd, ref mut offset) => {
let mut data = &dvd_bnd[*offset..];
let read = match Read::read(&mut data, buf) {
Ok(length) => length,
Err(e) => return Poll::Ready(Err(e)),
};

*offset += read;

Poll::Ready(Ok(read))
}
}
}
}
23 changes: 21 additions & 2 deletions crates/asset-server/src/asset_source/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use bevy::{
use crossbeam_channel::Sender;
use memmap2::{Mmap, MmapOptions};
use typed_path::Utf8WindowsPathBuf;
use crate::asset_source::fast_path::FastPathReader;

use crate::asset_source::SimpleReader;

pub mod watcher;

Expand Down Expand Up @@ -86,6 +86,25 @@ impl Vfs {
}
}

fn entry_file_list_id(map_id: u32, param_3: i32) -> i32 {
let map_parts:[u8; 4] = map_id.to_le_bytes().into();
let p1 = map_parts[3] as i32;
let p2 = map_parts[2] as i32;
let p3 = map_parts[1] as i32;
let p4 = map_parts[0] as i32;

// p1 is the start of the map ID: m10, m60, etc.
let result = if p1 == 60 {
let sub_map_id: i32 = (map_id & 0xf) as i32;
let mut result = (1 - sub_map_id / 10) * 10 - p2 / 100 + sub_map_id;
(result * 100 - p3 / 100 + p2) * 100 - param_3 / 10000 + p3
} else {
(((p1 + (p1 / 100) * -100) - p1 / 100) * 100 - param_3 / 10000) + p2
};

result * 10_000 + param_3
}

#[derive(Deref, DerefMut)]
pub struct VfsAssetSource(pub(crate) Vfs);

Expand All @@ -98,7 +117,7 @@ impl AssetReader for VfsAssetSource {
let bytes = self.entry_bytes(path.to_str().expect("invalid path"));

match bytes {
Some(data) => Ok(Box::new(SimpleReader(data)) as Box<Reader>),
Some(data) => Ok(Box::new(FastPathReader::Slice(data)) as Box<Reader>),
None => Err(AssetReaderError::NotFound(path.to_path_buf())),
}
})
Expand Down
3 changes: 2 additions & 1 deletion crates/asset-server/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bevy::prelude::*;
use crate::asset_source::fast_path::FastPathAssetLoader;

use crate::types::{
bnd4::{Archive, ArchiveEntry, Bnd4Loader},
Expand All @@ -21,7 +22,7 @@ impl Plugin for FsFormatsPlugin {
.init_asset::<ArchiveEntry>()
.init_asset::<MsbAsset>()
.register_asset_loader(MsbAssetLoader)
.register_asset_loader(FlverLoader)
.register_asset_loader(FastPathAssetLoader::<FlverAsset>::new(&["flver"]))
.register_asset_loader(Bnd4Loader);
app.init_asset::<MsbAsset>()
.init_asset::<MsbPointAsset>()
Expand Down
Loading

0 comments on commit b897920

Please sign in to comment.