Skip to content

Commit

Permalink
Refactor storage factory initialization (#3709)
Browse files Browse the repository at this point in the history
For #3443 I need to be able to perform initialization on storage factory
level and in order to do that I need access to the config parameters
during initialization rather than during storage resolution. This PR
moves the storage config parameters to the storage initializer.

Co-authored-by: Adrien Guillo <[email protected]>
  • Loading branch information
imotov and guilload authored Aug 5, 2023
1 parent 8001c85 commit e1ff124
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 206 deletions.
9 changes: 1 addition & 8 deletions quickwit/quickwit-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,14 +442,7 @@ mod tests {
};
let storage_configs = StorageConfigs::new(vec![s3_storage_config.into()]);
let metastore_configs = MetastoreConfigs::default();
let (storage_resolver, _metastore_resolver) =
let (_storage_resolver, _metastore_resolver) =
get_resolvers(&storage_configs, &metastore_configs);
assert!(
storage_resolver
.storage_configs()
.find_s3()
.unwrap()
.force_path_style_access
);
}
}
4 changes: 1 addition & 3 deletions quickwit/quickwit-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ pub use for_test::storage_for_test;
mod tests {
use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;

#[tokio::test]
async fn test_load_file() {
let storage_resolver = StorageResolver::builder()
.register(LocalFileStorageFactory, FileStorageConfig::default().into())
.register(LocalFileStorageFactory)
.build()
.unwrap();
let expected_bytes = tokio::fs::read_to_string("Cargo.toml").await.unwrap();
Expand Down
19 changes: 5 additions & 14 deletions quickwit/quickwit-storage/src/local_file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use futures::future::{BoxFuture, FutureExt};
use futures::StreamExt;
use quickwit_common::ignore_error_kind;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::warn;
Expand Down Expand Up @@ -345,11 +345,7 @@ impl StorageFactory for LocalFileStorageFactory {
StorageBackend::File
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = LocalFileStorage::from_uri(uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
Expand All @@ -360,8 +356,6 @@ mod tests {

use std::str::FromStr;

use quickwit_config::FileStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;

Expand Down Expand Up @@ -395,25 +389,22 @@ mod tests {

#[tokio::test]
async fn test_local_file_storage_factory() -> anyhow::Result<()> {
let storage_config = FileStorageConfig::default().into();
let temp_dir = tempfile::tempdir()?;
let index_uri =
Uri::from_well_formed(format!("file://{}/foo/bar", temp_dir.path().display()));
let local_file_storage_factory = LocalFileStorageFactory::default();
let local_file_storage = local_file_storage_factory
.resolve(&storage_config, &index_uri)
.await?;
let local_file_storage = local_file_storage_factory.resolve(&index_uri).await?;
assert_eq!(local_file_storage.uri(), &index_uri);

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://foo/bar"))
.resolve(&Uri::from_well_formed("s3://foo/bar"))
.await
.err()
.unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let err = local_file_storage_factory
.resolve(&storage_config, &Uri::from_well_formed("s3://"))
.resolve(&Uri::from_well_formed("s3://"))
.await
.err()
.unwrap();
Expand Down
29 changes: 13 additions & 16 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use once_cell::sync::OnceCell;
use quickwit_aws::retry::{retry, RetryParams, Retryable};
use quickwit_common::uri::Uri;
use quickwit_common::{chunk_range, ignore_error_kind, into_u64_range};
use quickwit_config::{AzureStorageConfig, StorageBackend, StorageConfig};
use quickwit_config::{AzureStorageConfig, StorageBackend};
use regex::Regex;
use tantivy::directory::OwnedBytes;
use thiserror::Error;
Expand All @@ -55,28 +55,25 @@ use crate::{
};

/// Azure object storage resolver.
#[derive(Default)]
pub struct AzureBlobStorageFactory;
pub struct AzureBlobStorageFactory {
storage_config: AzureStorageConfig,
}

impl AzureBlobStorageFactory {
/// Creates a new Azure blob storage factory.
pub fn new(storage_config: AzureStorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for AzureBlobStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::Azure
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let azure_storage_config = storage_config.as_azure().ok_or_else(|| {
let message = format!(
"Expected Azure storage config, got `{:?}`.",
storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
let storage = AzureBlobStorage::from_uri(azure_storage_config, uri)?;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = AzureBlobStorage::from_uri(&self.storage_config, uri)?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,32 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::{S3StorageConfig, StorageBackend};

use crate::{
DebouncedStorage, S3CompatibleObjectStorage, Storage, StorageFactory, StorageResolverError,
};

/// S3 compatible object storage resolver.
#[derive(Default)]
pub struct S3CompatibleObjectStorageFactory;
pub struct S3CompatibleObjectStorageFactory {
storage_config: S3StorageConfig,
}

impl S3CompatibleObjectStorageFactory {
/// Creates a new S3-compatible storage factory.
pub fn new(storage_config: S3StorageConfig) -> Self {
Self { storage_config }
}
}

#[async_trait]
impl StorageFactory for S3CompatibleObjectStorageFactory {
fn backend(&self) -> StorageBackend {
StorageBackend::S3
}

async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
let s3_storage_config = storage_config.as_s3().ok_or_else(|| {
let message = format!(
"Expected S3 storage config, got `{:?}`.",
storage_config.backend()
);
StorageResolverError::InvalidConfig(message)
})?;
let storage = S3CompatibleObjectStorage::from_uri(s3_storage_config, uri).await?;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
let storage = S3CompatibleObjectStorage::from_uri(&self.storage_config, uri).await?;
Ok(Arc::new(DebouncedStorage::new(storage)))
}
}
34 changes: 6 additions & 28 deletions quickwit/quickwit-storage/src/ram_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;

Expand Down Expand Up @@ -194,11 +194,7 @@ impl StorageFactory for RamStorageFactory {
StorageBackend::Ram
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
match uri.filepath() {
Some(prefix) if uri.protocol().is_ram() => Ok(add_prefix_to_storage(
self.ram_storage.clone(),
Expand All @@ -215,7 +211,6 @@ impl StorageFactory for RamStorageFactory {

#[cfg(test)]
mod tests {
use quickwit_config::RamStorageConfig;

use super::*;
use crate::test_suite::storage_test_suite;
Expand All @@ -230,34 +225,17 @@ mod tests {
#[tokio::test]
async fn test_ram_storage_factory() {
let ram_storage_factory = RamStorageFactory::default();
let storage_config = RamStorageConfig::default().into();
let ram_uri = Uri::from_well_formed("s3:///foo");
let err = ram_storage_factory
.resolve(&storage_config, &ram_uri)
.await
.err()
.unwrap();
let err = ram_storage_factory.resolve(&ram_uri).await.err().unwrap();
assert!(matches!(err, StorageResolverError::InvalidUri { .. }));

let data_uri = Uri::from_well_formed("ram:///data");
let data_storage = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
let home_uri = Uri::from_well_formed("ram:///home");
let home_storage = ram_storage_factory
.resolve(&storage_config, &home_uri)
.await
.ok()
.unwrap();
let home_storage = ram_storage_factory.resolve(&home_uri).await.ok().unwrap();
assert_ne!(data_storage.uri(), home_storage.uri());

let data_storage_two = ram_storage_factory
.resolve(&storage_config, &data_uri)
.await
.ok()
.unwrap();
let data_storage_two = ram_storage_factory.resolve(&data_uri).await.ok().unwrap();
assert_eq!(data_storage.uri(), data_storage_two.uri());
}

Expand Down
16 changes: 4 additions & 12 deletions quickwit/quickwit-storage/src/storage_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,20 @@ use std::sync::Arc;

use async_trait::async_trait;
use quickwit_common::uri::Uri;
use quickwit_config::{StorageBackend, StorageConfig};
use quickwit_config::StorageBackend;

use crate::{Storage, StorageResolverError};

/// A storage factory builds a [`Storage`] object for a target [`StorageBackend`] from a
/// [`StorageConfig`] and a [`Uri`].
/// [`Uri`].
#[cfg_attr(any(test, feature = "testsuite"), mockall::automock)]
#[async_trait]
pub trait StorageFactory: Send + Sync + 'static {
/// Returns the storage backend targeted by the factory.
fn backend(&self) -> StorageBackend;

/// Returns the appropriate [`Storage`] object for the URI.
async fn resolve(
&self,
storage_config: &StorageConfig,
uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError>;
async fn resolve(&self, uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError>;
}

/// A storage factory for handling unsupported or unavailable storage backends.
Expand All @@ -61,11 +57,7 @@ impl StorageFactory for UnsupportedStorage {
self.backend
}

async fn resolve(
&self,
_storage_config: &StorageConfig,
_uri: &Uri,
) -> Result<Arc<dyn Storage>, StorageResolverError> {
async fn resolve(&self, _uri: &Uri) -> Result<Arc<dyn Storage>, StorageResolverError> {
Err(StorageResolverError::UnsupportedBackend(
self.message.to_string(),
))
Expand Down
Loading

0 comments on commit e1ff124

Please sign in to comment.