Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: [EXPERIMENT] begin adding "async" versions of trait/impls #250

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ semver = { version = "1.0.23", optional = true }
bytesize = "1.3.0"
rayon = { version = "1.10.0", optional = true }
tokio = { version = "1.39.3", optional = true, default-features = false }
futures = { version = "0.3", optional = true, default-features = false }
async-trait = "0.1.81"

[target.'cfg(not(windows))'.dependencies]
# opendal backend - sftp is not supported on windows, see https://github.com/apache/incubator-opendal/issues/2963
Expand Down
52 changes: 52 additions & 0 deletions crates/backend/src/choose.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! This module contains [`BackendOptions`] and helpers to choose a backend from a given url.
use anyhow::{anyhow, Result};
use derive_setters::Setters;
use rustic_core::{AsyncRepositoryBackends, AsyncWriteBackend};
use std::{collections::HashMap, sync::Arc};
use strum_macros::{Display, EnumString};

Expand All @@ -10,6 +11,7 @@ use rustic_core::{RepositoryBackends, WriteBackend};
use crate::{
error::BackendAccessErrorKind,
local::LocalBackend,
opendal::AsyncOpenDALBackend,
util::{location_to_type_and_path, BackendLocation},
};

Expand Down Expand Up @@ -95,6 +97,19 @@ impl BackendOptions {
Ok(RepositoryBackends::new(be, be_hot))
}

pub fn to_async_backends(&self) -> Result<AsyncRepositoryBackends> {
let mut options = self.options.clone();
options.extend(self.options_cold.clone());
let be = self
.get_async_backed(self.repository.as_ref(), options)?
.ok_or_else(|| anyhow!("No repository given."))?;
let mut options = self.options.clone();
options.extend(self.options_hot.clone());
let be_hot = self.get_async_backed(self.repo_hot.as_ref(), options)?;

Ok(AsyncRepositoryBackends::new(be, be_hot))
}

/// Get the backend for the given repository.
///
/// # Arguments
Expand Down Expand Up @@ -125,6 +140,25 @@ impl BackendOptions {
})
.transpose()
}

fn get_async_backed(
&self,
repo_string: Option<&String>,
options: HashMap<String, String>,
) -> Result<Option<Arc<dyn AsyncWriteBackend>>> {
repo_string
.map(|string| {
let (be_type, location) = location_to_type_and_path(string)?;
match be_type.to_async_backends(location, options.into()) {
Ok(e) => Ok(e),
Err(e) if e.downcast_ref::<BackendAccessErrorKind>().is_some() => Err(e.into()),
Err(e) => {
Err(BackendAccessErrorKind::BackendLoadError(be_type.to_string(), e).into())
}
}
})
.transpose()
}
}

/// Trait which can be implemented to choose a backend from a backend type, a backend path and options given as `HashMap`.
Expand All @@ -146,6 +180,12 @@ pub trait BackendChoice {
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn WriteBackend>>;

fn to_async_backends(
&self,
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn AsyncWriteBackend>>;
}

/// The supported backend types.
Expand Down Expand Up @@ -196,6 +236,18 @@ impl BackendChoice for SupportedBackend {
Self::OpenDAL => Arc::new(OpenDALBackend::new(location, options)?),
})
}

fn to_async_backends(
&self,
location: BackendLocation,
options: Option<HashMap<String, String>>,
) -> Result<Arc<dyn AsyncWriteBackend>> {
let options = options.unwrap_or_default();
match self {
Self::OpenDAL => Ok(Arc::new(AsyncOpenDALBackend::new(location, options)?)),
_ => Err(BackendAccessErrorKind::BackendNoAsync(location.to_string()).into()),
}
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions crates/backend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use thiserror::Error;
/// [`BackendAccessErrorKind`] describes the errors that can be returned by the various Backends
#[derive(Error, Debug, Display)]
pub enum BackendAccessErrorKind {
/// no async variant implemented for backend {0:1}
BackendNoAsync(String),
/// backend {0:?} is not supported!
BackendNotSupported(String),
/// backend {0} cannot be loaded: {1:?}
Expand Down
240 changes: 234 additions & 6 deletions crates/backend/src/opendal.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
/// `OpenDAL` backend for rustic.
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock};
use std::{borrow::Borrow, collections::HashMap, path::PathBuf, str::FromStr, sync::OnceLock};

use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use bytes::Bytes;
use bytesize::ByteSize;
use log::trace;
use log::{debug, trace};
use opendal::{
layers::{BlockingLayer, ConcurrentLimitLayer, LoggingLayer, RetryLayer, ThrottleLayer},
BlockingOperator, ErrorKind, Metakey, Operator, Scheme,
};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::runtime::Runtime;
use tokio::runtime::{EnterGuard, Handle, Runtime};

use rustic_core::{FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES};
use rustic_core::{
AsyncReadBackend, AsyncWriteBackend, FileType, Id, ReadBackend, WriteBackend, ALL_FILE_TYPES,
};

mod constants {
/// Default number of retries
pub(super) const DEFAULT_RETRY: usize = 5;
}

/// `OpenDALBackend` contains a wrapper around an blocking operator of the `OpenDAL` library.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct OpenDALBackend {
operator: BlockingOperator,
}

/// Async implementation of [OpenDALBackend].
#[derive(Debug)]
pub struct AsyncOpenDALBackend {
operator: Operator,
}

fn runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
Expand Down Expand Up @@ -59,7 +68,226 @@ impl FromStr for Throttle {
}
}

impl AsyncOpenDALBackend {
// TODO - factorize code with OpenDALBackend::new()
pub fn new(path: impl AsRef<str>, options: HashMap<String, String>) -> Result<Self> {
let max_retries = match options.get("retry").map(String::as_str) {
Some("false" | "off") => 0,
None | Some("default") => constants::DEFAULT_RETRY,
Some(value) => usize::from_str(value)?,
};
let connections = options
.get("connections")
.map(|c| usize::from_str(c))
.transpose()?;

let throttle = options
.get("throttle")
.map(|t| Throttle::from_str(t))
.transpose()?;

let schema = Scheme::from_str(path.as_ref())?;
let mut operator = Operator::via_iter(schema, options)?
.layer(RetryLayer::new().with_max_times(max_retries).with_jitter());

if let Some(Throttle { bandwidth, burst }) = throttle {
operator = operator.layer(ThrottleLayer::new(bandwidth, burst));
}

if let Some(connections) = connections {
operator = operator.layer(ConcurrentLimitLayer::new(connections));
}

Ok(Self { operator })
}

/// Return a path for the given file type and id.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
///
/// # Returns
///
/// The path for the given file type and id.
// Let's keep this for now, as it's being used in the trait implementations.
#[allow(clippy::unused_self)]
fn path(&self, tpe: FileType, id: &Id) -> String {
let hex_id = id.to_hex();
match tpe {
FileType::Config => PathBuf::from("config"),
FileType::Pack => PathBuf::from("data").join(&hex_id[0..2]).join(hex_id),
_ => PathBuf::from(tpe.dirname()).join(hex_id),
}
.to_string_lossy()
.to_string()
}
}

#[async_trait]
impl AsyncReadBackend for AsyncOpenDALBackend {
/// Returns the location of the backend.
///
/// This is `local:<path>`.
fn location(&self) -> String {
let mut location = "opendal:".to_string();
location.push_str(self.operator.info().name());
location
}

/// Lists all files of the given type.
///
/// # Arguments
///
/// * `tpe` - The type of the files to list.
///
/// # Notes
///
/// If the file type is `FileType::Config`, this will return a list with a single default id.
async fn list(&self, tpe: FileType) -> Result<Vec<Id>> {
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return Ok(if self.operator.is_exist("config").await? {
vec![Id::default()]
} else {
Vec::new()
});
}

Ok(self
.operator
.list_with(&(tpe.dirname().to_string() + "/"))
.recursive(true)
.await?
.into_iter()
.filter(|e| e.metadata().is_file())
.map(|e| Id::from_hex(e.name()))
.filter_map(Result::ok)
.collect())
}

/// Lists all files with their size of the given type.
///
/// # Arguments
///
/// * `tpe` - The type of the files to list.
///
async fn list_with_size(&self, tpe: FileType) -> Result<Vec<(Id, u32)>> {
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return match self.operator.stat("config").await {
Ok(entry) => Ok(vec![(Id::default(), entry.content_length().try_into()?)]),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(Vec::new()),
Err(err) => Err(err.into()),
};
}

Ok(self
.operator
.list_with(&(tpe.dirname().to_string() + "/"))
.recursive(true)
.metakey(Metakey::ContentLength)
.await?
.into_iter()
.filter(|e| e.metadata().is_file())
.map(|e| -> Result<(Id, u32)> {
Ok((
Id::from_hex(e.name())?,
e.metadata().content_length().try_into()?,
))
})
.filter_map(Result::ok)
.collect())
}

async fn read_full(&self, tpe: FileType, id: &Id) -> Result<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}");

Ok(self.operator.read(&self.path(tpe, id)).await?.to_bytes())
}

async fn read_partial(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
offset: u32,
length: u32,
) -> Result<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}, offset: {offset}, length: {length}");
let range = u64::from(offset)..u64::from(offset + length);
Ok(self
.operator
.read_with(&self.path(tpe, id))
.range(range)
.await?
.to_bytes())
}
}

#[async_trait]
impl AsyncWriteBackend for AsyncOpenDALBackend {
async fn create(&self) -> Result<()> {
trace!("creating repo at {:?}", self.location());

for tpe in ALL_FILE_TYPES {
self.operator
.create_dir(&(tpe.dirname().to_string() + "/"))
.await?;
}

// TODO - use futures::stream::Stream;
for i in 0u8..=255 {
self.operator
.create_dir(
&(PathBuf::from("data")
.join(hex::encode([i]))
.to_string_lossy()
.to_string()
+ "/"),
)
.await?
}

Ok(())
}

async fn write_bytes(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
buf: Bytes,
) -> Result<()> {
trace!("writing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
self.operator.write(&filename, buf).await?;
Ok(())
}

/// Remove the given file.
///
/// # Arguments
///
/// * `tpe` - The type of the file.
/// * `id` - The id of the file.
/// * `cacheable` - Whether the file is cacheable.
async fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> Result<()> {
trace!("removing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
self.operator.delete(&filename).await?;
Ok(())
}
}

impl OpenDALBackend {
/// TODO have some shared trait with such a method
/// otherwise the knowledge of this async safety could be in this match method in choose.rs
fn safe_in_async_context() -> bool {
false
}

/// Create a new openDAL backend.
///
/// # Arguments
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ itertools = "0.13.0"
quick_cache = "0.6.2"
strum = { version = "0.26.3", features = ["derive"] }
zstd = "0.13.2"
async-trait = "0.1.81"

[target.'cfg(not(windows))'.dependencies]
sha2 = { version = "0.10.8", features = ["asm"] }
Expand Down
Loading
Loading