Skip to content

Commit

Permalink
chore: Cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
lfbrehm committed Mar 15, 2024
1 parent 4c29a27 commit 8349f8c
Show file tree
Hide file tree
Showing 15 changed files with 69 additions and 65 deletions.
11 changes: 6 additions & 5 deletions components/data_proxy/src/caching/grpc_query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1341,9 +1341,13 @@ impl GrpcQueryHandler {
Ok(object)
}


#[tracing::instrument(level = "trace", skip(self, id, hashes, token))]
pub async fn set_object_hashes(&self, id: &DieselUlid, hashes: Vec<Hash>, token: &str) -> Result<()> {
pub async fn set_object_hashes(
&self,
id: &DieselUlid,
hashes: Vec<Hash>,
token: &str,
) -> Result<()> {
let mut req = Request::new(SetObjectHashesRequest {
object_id: id.to_string(),
hashes,
Expand All @@ -1362,9 +1366,6 @@ impl GrpcQueryHandler {

Ok(())
}



}

#[tracing::instrument(level = "trace", skip(res))]
Expand Down
44 changes: 20 additions & 24 deletions components/data_proxy/src/replication/replication_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use diesel_ulid::DieselUlid;
use md5::{Digest, Md5};
use pithos_lib::transformers::footer_extractor::FooterExtractor;
use pithos_lib::{streamreadwrite::GenericStreamReadWriter, transformer::ReadWriter};
use std::{str::FromStr, sync::Arc};
use std::default::Default;

Check warning on line 24 in components/data_proxy/src/replication/replication_handler.rs

View workflow job for this annotation

GitHub Actions / test

the item `Default` is imported redundantly
use tokio::sync::RwLock;
use std::{str::FromStr, sync::Arc};
use tokio::pin;
use tokio::sync::RwLock;
use tracing::{info_span, trace, Instrument};

pub struct ReplicationMessage {
Expand Down Expand Up @@ -60,22 +60,17 @@ pub struct ReplicationHandler {

#[derive(Clone, Debug)]
struct ObjectState {
sender: Sender<DataChunk>,
receiver: Receiver<DataChunk>,
state: ObjectStateStatus
sender: Sender<DataChunk>,
receiver: Receiver<DataChunk>,
state: ObjectStateStatus,
}

#[derive(Clone, Debug)]
pub enum ObjectStateStatus {
NotReceived,
Infos{
max_chunks: i64,
size: i64
},
Infos { max_chunks: i64, size: i64 },
}



impl ObjectState {
pub fn new(sender: Sender<DataChunk>, receiver: Receiver<DataChunk>) -> Self {
ObjectState {
Expand All @@ -85,27 +80,29 @@ impl ObjectState {
}
}

pub fn update_state(&mut self, max_chunks:i64, size:i64) {
self.state = ObjectStateStatus::Infos { max_chunks , size };
pub fn update_state(&mut self, max_chunks: i64, size: i64) {
self.state = ObjectStateStatus::Infos { max_chunks, size };
}

pub fn is_synced(&self) -> bool {
!matches!{self.state, ObjectStateStatus::NotReceived}
!matches! {self.state, ObjectStateStatus::NotReceived}
}

pub fn get_size(&self) -> Option<i64> {
if let ObjectStateStatus::Infos {size, ..} = self.state {
if let ObjectStateStatus::Infos { size, .. } = self.state {
Some(size)
} else { None }
} else {
None
}
}

pub fn get_rcv(&self) -> Receiver<DataChunk> {
self.receiver.clone()
}

pub fn get_chunks(&self) -> Result<i64> {
if let ObjectStateStatus::Infos {max_chunks,.. } = self.state {
if max_chunks > 0 {
if let ObjectStateStatus::Infos { max_chunks, .. } = self.state {
if max_chunks > 0 {
Ok(max_chunks)
} else {
Err(anyhow!("Invalid max chunks received"))
Expand All @@ -120,8 +117,7 @@ impl ObjectState {
}
}

type ObjectHandler =
Arc<DashMap<String, Arc<RwLock<ObjectState>>, RandomState>>;
type ObjectHandler = Arc<DashMap<String, Arc<RwLock<ObjectState>>, RandomState>>;
impl ReplicationHandler {
#[tracing::instrument(level = "trace", skip(cache, backend, receiver))]
pub fn new(
Expand Down Expand Up @@ -280,10 +276,10 @@ impl ReplicationHandler {
let (object_sdx, object_rcv) = async_channel::bounded(1000);
object_handler_map.insert(
object.to_string(),
Arc::new(RwLock::new(
ObjectState::new(object_sdx.clone(), object_rcv.clone())

))
Arc::new(RwLock::new(ObjectState::new(
object_sdx.clone(),
object_rcv.clone(),
))),
);
}

Expand Down
9 changes: 5 additions & 4 deletions components/data_proxy/src/s3_frontend/data_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ impl DataHandler {
asr = asr.add_transformer(PithosTransformer::new());
asr = asr.add_transformer(FooterGenerator::new(None));
}

let (final_sha, final_sha_recv) =
HashingTransformer::new_with_backchannel(Sha256::new(), "sha256".to_string());

asr = asr.add_transformer(final_sha);

let (disk_size_probe, disk_size_stream) = SizeProbe::new();
asr = asr.add_transformer(disk_size_probe);

asr.process().await.map_err(|e| {
error!(error = ?e, msg = e.to_string());
e
Expand Down Expand Up @@ -252,8 +252,9 @@ impl DataHandler {
// Set id of new location to object id to satisfy FK constraint
// TODO: Update hashes etc.

handler.set_object_hashes(&object.id, hashes, &token).await?;

handler
.set_object_hashes(&object.id, hashes, &token)
.await?;

cache.update_location(object.id, new_location).await?;

Expand Down
2 changes: 1 addition & 1 deletion components/data_proxy/src/s3_frontend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ pub mod data_handler;
pub mod s3server;
pub mod s3service;
pub mod utils;
//pub mod dropbox_handler;
//pub mod dropbox_handler;
3 changes: 1 addition & 2 deletions components/data_proxy/src/s3_frontend/s3service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,7 @@ impl S3 for ArunaS3Service {
);

if let Some(key) = decryption_key {
asrw = asrw
.add_transformer(ChaCha20DecParts::new_with_lengths(key, parts));
asrw = asrw.add_transformer(ChaCha20DecParts::new_with_lengths(key, parts));
}

if location.is_compressed() {
Expand Down
5 changes: 4 additions & 1 deletion components/server/src/auth/issuer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ impl Issuer {
}

pub async fn refresh_jwks(&mut self) -> Result<()> {
if self.last_updated + chrono::Duration::try_minutes(5).ok_or_else(|| anyhow!("time conversion failed"))? > Utc::now().naive_utc() {
if self.last_updated
+ chrono::Duration::try_minutes(5).ok_or_else(|| anyhow!("time conversion failed"))?
> Utc::now().naive_utc()
{
bail!("JWKS was updated less than 5 minutes ago");
}

Expand Down
4 changes: 1 addition & 3 deletions components/server/src/grpc/service_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ impl ServiceAccountService for ServiceAccountServiceImpl {
.object_type
!= ObjectType::PROJECT
{
return Err(Status::invalid_argument(
"Id does not match any projects",
));
return Err(Status::invalid_argument("Id does not match any projects"));
}
let ctx = vec![
// Only Resource/Project Admins should be able to do this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ use crate::{
},
};
use anyhow::{anyhow, Result};
use aruna_rust_api::api::storage::services::v2::{AddDataProxyAttributeSvcAccountRequest, AddPubkeySvcAccountRequest, AddTrustedEndpointsSvcAccountRequest, CreateS3CredentialsSvcAccountRequest, DeleteS3CredentialsSvcAccountRequest, RemoveDataProxyAttributeSvcAccountRequest, RemoveDataProxyAttributeUserRequest, RemoveTrustedEndpointsSvcAccountRequest};
use aruna_rust_api::api::storage::services::v2::{
AddDataProxyAttributeSvcAccountRequest, AddPubkeySvcAccountRequest,
AddTrustedEndpointsSvcAccountRequest, CreateS3CredentialsSvcAccountRequest,
DeleteS3CredentialsSvcAccountRequest, RemoveDataProxyAttributeSvcAccountRequest,
RemoveDataProxyAttributeUserRequest, RemoveTrustedEndpointsSvcAccountRequest,
};
use aruna_rust_api::api::storage::{
models::v2::permission::ResourceId,
services::v2::{
Expand Down Expand Up @@ -193,12 +198,11 @@ impl RemoveDataproxyAttributeSvcAccount {
dataproxy_id: self.0.dataproxy_id,
attribute_name: self.0.attribute_name,
}

}
}

// Helper trait to get infos needed for proxy interaction
pub (crate) trait GetEndpointInteractionInfos {
pub(crate) trait GetEndpointInteractionInfos {
// Return request strings
fn get_unparsed_ids(&self) -> (String, String);
// Return service account ulid and endpoint ulid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::str::FromStr;
use std::sync::Arc;

use super::db_handler::DatabaseHandler;
use super::service_account_request_types::{CreateServiceAccountToken, DeleteServiceAccount, DeleteServiceAccountToken, DeleteServiceAccountTokens, GetServiceAccountInfo, GetTokenAndServiceAccountInfo};
use super::service_account_request_types::{
CreateServiceAccountToken, DeleteServiceAccount, DeleteServiceAccountToken,
DeleteServiceAccountTokens, GetServiceAccountInfo, GetTokenAndServiceAccountInfo,
};
use crate::auth::permission_handler::PermissionHandler;
use crate::database::crud::CrudDb;
use crate::database::dsls::object_dsl::Object;
Expand Down
6 changes: 5 additions & 1 deletion components/server/src/search/meilisearch_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ impl From<DbObject> for ObjectDocument {
size: db_object.content_len,
labels: filtered_labels,
data_class: db_object.data_class,
created_at: db_object.created_at.unwrap_or_default().and_utc().timestamp(),
created_at: db_object
.created_at
.unwrap_or_default()
.and_utc()
.timestamp(),
dynamic: db_object.dynamic,
metadata_license: db_object.metadata_license,
data_license: db_object.data_license,
Expand Down
6 changes: 5 additions & 1 deletion components/server/tests/database/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ async fn general_object_stats_test() {
assert!(!all_stats.is_empty());

// Assert that last timestamp is greater than timestamp at the start of the test
let last_timestamp = get_last_refresh(&client).await.unwrap().and_utc().timestamp_millis();
let last_timestamp = get_last_refresh(&client)
.await
.unwrap()
.and_utc()
.timestamp_millis();
assert!(last_timestamp > timestamp)
}
8 changes: 6 additions & 2 deletions components/server/tests/grpc/licenses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ async fn default_licenses() {
.unwrap()
.into_inner()
.tag;
create_project_request.metadata_license_tag.clone_from(&license_tag);
create_project_request.default_data_license_tag.clone_from(&license_tag);
create_project_request
.metadata_license_tag
.clone_from(&license_tag);
create_project_request
.default_data_license_tag
.clone_from(&license_tag);
let project = services
.project_service
.create_project(add_token(
Expand Down
10 changes: 2 additions & 8 deletions components/server/tests/grpc/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,10 +346,7 @@ async fn get_resource() {
//
let response = service_block
.search_service
.get_resource(add_token(
Request::new(public_request),
USER1_OIDC_TOKEN,
))
.get_resource(add_token(Request::new(public_request), USER1_OIDC_TOKEN))
.await
.unwrap()
.into_inner()
Expand All @@ -371,10 +368,7 @@ async fn get_resource() {
assert_eq!(project.endpoints, public_project.endpoints);
let response = service_block
.search_service
.get_resources(add_token(
Request::new(private_request),
USER1_OIDC_TOKEN,
))
.get_resources(add_token(Request::new(private_request), USER1_OIDC_TOKEN))
.await
.unwrap()
.into_inner()
Expand Down
6 changes: 1 addition & 5 deletions components/server/tests/middlelayer/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,7 @@ async fn delete_object() {
objects.push(new_object(user.id, o, ObjectType::OBJECT));
}
let proj_id = DieselUlid::generate();
objects.push(new_object(
user.id,
proj_id,
ObjectType::PROJECT,
));
objects.push(new_object(user.id, proj_id, ObjectType::PROJECT));

let proj_relations = InternalRelation {
id: DieselUlid::generate(),
Expand Down
5 changes: 1 addition & 4 deletions components/server/tests/middlelayer/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,7 @@ async fn update_object_test() {
.await
.unwrap();
assert!(is_new_2);
assert_eq!(
new_2.object.revision_number,
new.object.revision_number + 1
);
assert_eq!(new_2.object.revision_number, new.object.revision_number + 1);
assert_eq!(new_2.object.object_status, ObjectStatus::INITIALIZING);

// test license update
Expand Down

0 comments on commit 8349f8c

Please sign in to comment.