diff --git a/components/data_proxy/src/caching/grpc_query_handler.rs b/components/data_proxy/src/caching/grpc_query_handler.rs index 575cd564..c07249f5 100644 --- a/components/data_proxy/src/caching/grpc_query_handler.rs +++ b/components/data_proxy/src/caching/grpc_query_handler.rs @@ -1341,9 +1341,13 @@ impl GrpcQueryHandler { Ok(object) } - #[tracing::instrument(level = "trace", skip(self, id, hashes, token))] - pub async fn set_object_hashes(&self, id: &DieselUlid, hashes: Vec, token: &str) -> Result<()> { + pub async fn set_object_hashes( + &self, + id: &DieselUlid, + hashes: Vec, + token: &str, + ) -> Result<()> { let mut req = Request::new(SetObjectHashesRequest { object_id: id.to_string(), hashes, @@ -1362,9 +1366,6 @@ impl GrpcQueryHandler { Ok(()) } - - - } #[tracing::instrument(level = "trace", skip(res))] diff --git a/components/data_proxy/src/replication/replication_handler.rs b/components/data_proxy/src/replication/replication_handler.rs index 5c950295..e286be4f 100644 --- a/components/data_proxy/src/replication/replication_handler.rs +++ b/components/data_proxy/src/replication/replication_handler.rs @@ -21,10 +21,10 @@ use diesel_ulid::DieselUlid; use md5::{Digest, Md5}; use pithos_lib::transformers::footer_extractor::FooterExtractor; use pithos_lib::{streamreadwrite::GenericStreamReadWriter, transformer::ReadWriter}; -use std::{str::FromStr, sync::Arc}; use std::default::Default; -use tokio::sync::RwLock; +use std::{str::FromStr, sync::Arc}; use tokio::pin; +use tokio::sync::RwLock; use tracing::{info_span, trace, Instrument}; pub struct ReplicationMessage { @@ -60,22 +60,17 @@ pub struct ReplicationHandler { #[derive(Clone, Debug)] struct ObjectState { - sender: Sender, - receiver: Receiver, - state: ObjectStateStatus + sender: Sender, + receiver: Receiver, + state: ObjectStateStatus, } #[derive(Clone, Debug)] pub enum ObjectStateStatus { NotReceived, - Infos{ - max_chunks: i64, - size: i64 - }, + Infos { max_chunks: i64, size: i64 }, } - - impl ObjectState { pub fn new(sender: Sender, receiver: Receiver) -> Self { ObjectState { @@ -85,18 +80,20 @@ impl ObjectState { } } - pub fn update_state(&mut self, max_chunks:i64, size:i64) { - self.state = ObjectStateStatus::Infos { max_chunks , size }; + pub fn update_state(&mut self, max_chunks: i64, size: i64) { + self.state = ObjectStateStatus::Infos { max_chunks, size }; } pub fn is_synced(&self) -> bool { - !matches!{self.state, ObjectStateStatus::NotReceived} + !matches! {self.state, ObjectStateStatus::NotReceived} } pub fn get_size(&self) -> Option { - if let ObjectStateStatus::Infos {size, ..} = self.state { + if let ObjectStateStatus::Infos { size, .. } = self.state { Some(size) - } else { None } + } else { + None + } } pub fn get_rcv(&self) -> Receiver { @@ -104,8 +101,8 @@ impl ObjectState { } pub fn get_chunks(&self) -> Result { - if let ObjectStateStatus::Infos {max_chunks,.. } = self.state { - if max_chunks > 0 { + if let ObjectStateStatus::Infos { max_chunks, .. } = self.state { + if max_chunks > 0 { Ok(max_chunks) } else { Err(anyhow!("Invalid max chunks received")) @@ -120,8 +117,7 @@ impl ObjectState { } } -type ObjectHandler = - Arc>, RandomState>>; +type ObjectHandler = Arc>, RandomState>>; impl ReplicationHandler { #[tracing::instrument(level = "trace", skip(cache, backend, receiver))] pub fn new( @@ -280,10 +276,10 @@ impl ReplicationHandler { let (object_sdx, object_rcv) = async_channel::bounded(1000); object_handler_map.insert( object.to_string(), - Arc::new(RwLock::new( - ObjectState::new(object_sdx.clone(), object_rcv.clone()) - - )) + Arc::new(RwLock::new(ObjectState::new( + object_sdx.clone(), + object_rcv.clone(), + ))), ); } diff --git a/components/data_proxy/src/s3_frontend/data_handler.rs b/components/data_proxy/src/s3_frontend/data_handler.rs index a177a92d..f6e1affe 100644 --- a/components/data_proxy/src/s3_frontend/data_handler.rs +++ b/components/data_proxy/src/s3_frontend/data_handler.rs @@ -170,7 +170,7 @@ impl DataHandler { asr = asr.add_transformer(PithosTransformer::new()); asr = asr.add_transformer(FooterGenerator::new(None)); } - + let (final_sha, final_sha_recv) = HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string()); @@ -178,7 +178,7 @@ impl DataHandler { let (disk_size_probe, disk_size_stream) = SizeProbe::new(); asr = asr.add_transformer(disk_size_probe); - + asr.process().await.map_err(|e| { error!(error = ?e, msg = e.to_string()); e @@ -252,8 +252,9 @@ impl DataHandler { // Set id of new location to object id to satisfy FK constraint // TODO: Update hashes etc. - handler.set_object_hashes(&object.id, hashes, &token).await?; - + handler + .set_object_hashes(&object.id, hashes, &token) + .await?; cache.update_location(object.id, new_location).await?; diff --git a/components/data_proxy/src/s3_frontend/mod.rs b/components/data_proxy/src/s3_frontend/mod.rs index da2d37dd..4ca7a4ac 100644 --- a/components/data_proxy/src/s3_frontend/mod.rs +++ b/components/data_proxy/src/s3_frontend/mod.rs @@ -3,4 +3,4 @@ pub mod data_handler; pub mod s3server; pub mod s3service; pub mod utils; -//pub mod dropbox_handler; \ No newline at end of file +//pub mod dropbox_handler; diff --git a/components/data_proxy/src/s3_frontend/s3service.rs b/components/data_proxy/src/s3_frontend/s3service.rs index 9f49da30..9523de4c 100644 --- a/components/data_proxy/src/s3_frontend/s3service.rs +++ b/components/data_proxy/src/s3_frontend/s3service.rs @@ -626,8 +626,7 @@ impl S3 for ArunaS3Service { ); if let Some(key) = decryption_key { - asrw = asrw - .add_transformer(ChaCha20DecParts::new_with_lengths(key, parts)); + asrw = asrw.add_transformer(ChaCha20DecParts::new_with_lengths(key, parts)); } if location.is_compressed() { diff --git a/components/server/src/auth/issuer_handler.rs b/components/server/src/auth/issuer_handler.rs index 47b6d746..44504105 100644 --- a/components/server/src/auth/issuer_handler.rs +++ b/components/server/src/auth/issuer_handler.rs @@ -72,7 +72,10 @@ impl Issuer { } pub async fn refresh_jwks(&mut self) -> Result<()> { - if self.last_updated + chrono::Duration::try_minutes(5).ok_or_else(|| anyhow!("time conversion failed"))? > Utc::now().naive_utc() { + if self.last_updated + + chrono::Duration::try_minutes(5).ok_or_else(|| anyhow!("time conversion failed"))? + > Utc::now().naive_utc() + { bail!("JWKS was updated less than 5 minutes ago"); } diff --git a/components/server/src/grpc/service_account.rs b/components/server/src/grpc/service_account.rs index 494e6039..71a6ca60 100644 --- a/components/server/src/grpc/service_account.rs +++ b/components/server/src/grpc/service_account.rs @@ -66,9 +66,7 @@ impl ServiceAccountService for ServiceAccountServiceImpl { .object_type != ObjectType::PROJECT { - return Err(Status::invalid_argument( - "Id does not match any projects", - )); + return Err(Status::invalid_argument("Id does not match any projects")); } let ctx = vec![ // Only Resource/Project Admins should be able to do this diff --git a/components/server/src/middlelayer/service_account_request_types.rs b/components/server/src/middlelayer/service_account_request_types.rs index 418b4cce..c81511e4 100644 --- a/components/server/src/middlelayer/service_account_request_types.rs +++ b/components/server/src/middlelayer/service_account_request_types.rs @@ -7,7 +7,12 @@ use crate::{ }, }; use anyhow::{anyhow, Result}; -use aruna_rust_api::api::storage::services::v2::{AddDataProxyAttributeSvcAccountRequest, AddPubkeySvcAccountRequest, AddTrustedEndpointsSvcAccountRequest, CreateS3CredentialsSvcAccountRequest, DeleteS3CredentialsSvcAccountRequest, RemoveDataProxyAttributeSvcAccountRequest, RemoveDataProxyAttributeUserRequest, RemoveTrustedEndpointsSvcAccountRequest}; +use aruna_rust_api::api::storage::services::v2::{ + AddDataProxyAttributeSvcAccountRequest, AddPubkeySvcAccountRequest, + AddTrustedEndpointsSvcAccountRequest, CreateS3CredentialsSvcAccountRequest, + DeleteS3CredentialsSvcAccountRequest, RemoveDataProxyAttributeSvcAccountRequest, + RemoveDataProxyAttributeUserRequest, RemoveTrustedEndpointsSvcAccountRequest, +}; use aruna_rust_api::api::storage::{ models::v2::permission::ResourceId, services::v2::{ @@ -193,12 +198,11 @@ impl RemoveDataproxyAttributeSvcAccount { dataproxy_id: self.0.dataproxy_id, attribute_name: self.0.attribute_name, } - } } // Helper trait to get infos needed for proxy interaction -pub (crate) trait GetEndpointInteractionInfos { +pub(crate) trait GetEndpointInteractionInfos { // Return request strings fn get_unparsed_ids(&self) -> (String, String); // Return service account ulid and endpoint ulid diff --git a/components/server/src/middlelayer/service_accounts_db_handler.rs b/components/server/src/middlelayer/service_accounts_db_handler.rs index 434007b0..772d594b 100644 --- a/components/server/src/middlelayer/service_accounts_db_handler.rs +++ b/components/server/src/middlelayer/service_accounts_db_handler.rs @@ -2,7 +2,10 @@ use std::str::FromStr; use std::sync::Arc; use super::db_handler::DatabaseHandler; -use super::service_account_request_types::{CreateServiceAccountToken, DeleteServiceAccount, DeleteServiceAccountToken, DeleteServiceAccountTokens, GetServiceAccountInfo, GetTokenAndServiceAccountInfo}; +use super::service_account_request_types::{ + CreateServiceAccountToken, DeleteServiceAccount, DeleteServiceAccountToken, + DeleteServiceAccountTokens, GetServiceAccountInfo, GetTokenAndServiceAccountInfo, +}; use crate::auth::permission_handler::PermissionHandler; use crate::database::crud::CrudDb; use crate::database::dsls::object_dsl::Object; diff --git a/components/server/src/search/meilisearch_client.rs b/components/server/src/search/meilisearch_client.rs index 17f154ae..a2efce36 100644 --- a/components/server/src/search/meilisearch_client.rs +++ b/components/server/src/search/meilisearch_client.rs @@ -84,7 +84,11 @@ impl From for ObjectDocument { size: db_object.content_len, labels: filtered_labels, data_class: db_object.data_class, - created_at: db_object.created_at.unwrap_or_default().and_utc().timestamp(), + created_at: db_object + .created_at + .unwrap_or_default() + .and_utc() + .timestamp(), dynamic: db_object.dynamic, metadata_license: db_object.metadata_license, data_license: db_object.data_license, diff --git a/components/server/tests/database/stats.rs b/components/server/tests/database/stats.rs index 6fc4825d..01b5b310 100644 --- a/components/server/tests/database/stats.rs +++ b/components/server/tests/database/stats.rs @@ -97,6 +97,10 @@ async fn general_object_stats_test() { assert!(!all_stats.is_empty()); // Assert that last timestamp is greater than timestamp at the start of the test - let last_timestamp = get_last_refresh(&client).await.unwrap().and_utc().timestamp_millis(); + let last_timestamp = get_last_refresh(&client) + .await + .unwrap() + .and_utc() + .timestamp_millis(); assert!(last_timestamp > timestamp) } diff --git a/components/server/tests/grpc/licenses.rs b/components/server/tests/grpc/licenses.rs index 3d6209b7..3c9dda8a 100644 --- a/components/server/tests/grpc/licenses.rs +++ b/components/server/tests/grpc/licenses.rs @@ -184,8 +184,12 @@ async fn default_licenses() { .unwrap() .into_inner() .tag; - create_project_request.metadata_license_tag.clone_from(&license_tag); - create_project_request.default_data_license_tag.clone_from(&license_tag); + create_project_request + .metadata_license_tag + .clone_from(&license_tag); + create_project_request + .default_data_license_tag + .clone_from(&license_tag); let project = services .project_service .create_project(add_token( diff --git a/components/server/tests/grpc/search.rs b/components/server/tests/grpc/search.rs index da613d01..6ec1c19e 100644 --- a/components/server/tests/grpc/search.rs +++ b/components/server/tests/grpc/search.rs @@ -346,10 +346,7 @@ async fn get_resource() { // let response = service_block .search_service - .get_resource(add_token( - Request::new(public_request), - USER1_OIDC_TOKEN, - )) + .get_resource(add_token(Request::new(public_request), USER1_OIDC_TOKEN)) .await .unwrap() .into_inner() @@ -371,10 +368,7 @@ async fn get_resource() { assert_eq!(project.endpoints, public_project.endpoints); let response = service_block .search_service - .get_resources(add_token( - Request::new(private_request), - USER1_OIDC_TOKEN, - )) + .get_resources(add_token(Request::new(private_request), USER1_OIDC_TOKEN)) .await .unwrap() .into_inner() diff --git a/components/server/tests/middlelayer/delete.rs b/components/server/tests/middlelayer/delete.rs index cd251259..ca22b578 100644 --- a/components/server/tests/middlelayer/delete.rs +++ b/components/server/tests/middlelayer/delete.rs @@ -114,11 +114,7 @@ async fn delete_object() { objects.push(new_object(user.id, o, ObjectType::OBJECT)); } let proj_id = DieselUlid::generate(); - objects.push(new_object( - user.id, - proj_id, - ObjectType::PROJECT, - )); + objects.push(new_object(user.id, proj_id, ObjectType::PROJECT)); let proj_relations = InternalRelation { id: DieselUlid::generate(), diff --git a/components/server/tests/middlelayer/updates.rs b/components/server/tests/middlelayer/updates.rs index ff3af25a..dfcf58df 100644 --- a/components/server/tests/middlelayer/updates.rs +++ b/components/server/tests/middlelayer/updates.rs @@ -506,10 +506,7 @@ async fn update_object_test() { .await .unwrap(); assert!(is_new_2); - assert_eq!( - new_2.object.revision_number, - new.object.revision_number + 1 - ); + assert_eq!(new_2.object.revision_number, new.object.revision_number + 1); assert_eq!(new_2.object.object_status, ObjectStatus::INITIALIZING); // test license update