Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor sync #21

Merged
merged 13 commits into from
Nov 16, 2023
4 changes: 2 additions & 2 deletions gdrive_lib/src/gdrive_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,8 +911,8 @@ impl GDriveInfo {
let urlname = format_sstr!("gdrive://{}/", gdrive.session_name);
let urlname = Url::parse(&urlname)?;
let urlname = export_path.iter().try_fold(urlname, |u, e| {
if e.contains('#') {
u.join(&e.replace('#', "%35"))
if e.contains('#') || e.contains('?') {
u.join(&e.replace('#', "%23").replace('?', "%3F"))
} else {
u.join(e)
}
Expand Down
15 changes: 6 additions & 9 deletions sync_app_lib/src/file_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ impl Default for FileInfoInner {
#[derive(Clone, Debug, PartialEq, Eq, Default, Deref)]
pub struct FileInfo(Arc<FileInfoInner>);

// impl Deref for FileInfo {
// type Target = FileInfoInner;
// fn deref(&self) -> &Self::Target {
// &self.0
// }
// }

pub enum FileInfoKeyType {
FileName,
FilePath,
Expand Down Expand Up @@ -267,8 +260,12 @@ impl TryFrom<FileInfoCache> for FileInfo {
impl FileInfo {
/// # Errors
/// Return error if db query fails
pub async fn from_database(pool: &PgPool, url: &Url) -> Result<Option<Self>, Error> {
FileInfoCache::get_by_urlname(url, pool)
pub async fn from_database(
pool: &PgPool,
url: &Url,
servicesession: &str,
) -> Result<Option<Self>, Error> {
FileInfoCache::get_by_urlname(url, servicesession, pool)
.await?
.map(TryInto::try_into)
.transpose()
Expand Down
143 changes: 4 additions & 139 deletions sync_app_lib/src/file_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ use crate::{
#[derive(Clone, Debug)]
pub struct FileList {
baseurl: Url,
filemap: Arc<HashMap<StackString, FileInfo>>,
inner: Arc<FileListInner>,
min_mtime: Option<u32>,
max_mtime: Option<u32>,
}

impl Deref for FileList {
Expand All @@ -55,21 +52,17 @@ impl FileList {
config: Config,
servicetype: FileService,
servicesession: ServiceSession,
filemap: HashMap<StackString, FileInfo>,
pool: PgPool,
) -> Self {
Self {
baseurl,
filemap: Arc::new(filemap),
inner: Arc::new(FileListInner {
basepath,
config,
servicetype,
servicesession,
pool,
}),
min_mtime: None,
max_mtime: None,
}
}

Expand Down Expand Up @@ -125,11 +118,6 @@ pub trait FileListTrait: Send + Sync + Debug {
fn get_config(&self) -> &Config;

fn get_pool(&self) -> &PgPool;
fn get_filemap(&self) -> &HashMap<StackString, FileInfo>;
fn get_min_mtime(&self) -> Option<u32>;
fn get_max_mtime(&self) -> Option<u32>;

fn with_list(&mut self, filelist: Vec<FileInfo>);

// Copy operation where the origin (finfo0) has the same servicetype as self
async fn copy_from(
Expand Down Expand Up @@ -162,7 +150,8 @@ pub trait FileListTrait: Send + Sync + Debug {
panic!("not implemented for {:?}", finfo);
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error>;
/// Return updated FileInfo entries
async fn update_file_cache(&self) -> Result<usize, Error>;

async fn print_list(&self, _: &StdoutChannel<StackString>) -> Result<(), Error> {
unimplemented!()
Expand Down Expand Up @@ -192,89 +181,6 @@ pub trait FileListTrait: Send + Sync + Debug {
}
}

async fn cache_file_list(&self) -> Result<usize, Error> {
let pool = self.get_pool();

// Load existing file_list, create hashmap
let current_cache: HashMap<_, _> = self
.load_file_list(false)
.await?
.into_iter()
.filter_map(|item| {
let key = item.get_key();
key.map(|k| (k, item))
})
.collect();

// Load and convert current filemap
let flist_cache_map: HashMap<_, _> = self
.get_filemap()
.iter()
.filter_map(|(_, f)| {
let item: FileInfoCache = f.into();

let key = item.get_key();
key.map(|k| (k, item))
})
.collect();

// Delete entries from current_cache not in filemap
let mut deleted_entries = 0;
for k in current_cache.keys() {
if flist_cache_map.contains_key(k) {
continue;
}
info!("remove {:?}", k);
k.delete_cache_entry(pool).await?;
deleted_entries += 1;
}

info!(
"flist_cache_map {} {} {} {} {}",
self.get_servicesession().as_str(),
self.get_servicetype(),
flist_cache_map.len(),
current_cache.len(),
deleted_entries
);

for (k, v) in &flist_cache_map {
if let Some(item) = current_cache.get(k) {
if v.md5sum != item.md5sum
|| v.sha1sum != item.sha1sum
|| v.filestat_st_mtime != item.filestat_st_mtime
|| v.filestat_st_size != item.filestat_st_size
{
let mut cache = v
.get_cache(pool)
.await?
.ok_or_else(|| format_err!("Cache doesn't exist"))?;
if let Some(md5sum) = &v.md5sum {
cache.md5sum = Some(md5sum.clone());
}
if let Some(sha1sum) = &v.sha1sum {
cache.sha1sum = Some(sha1sum.clone());
}
cache.filestat_st_mtime = v.filestat_st_mtime;
cache.filestat_st_size = v.filestat_st_size;

info!("GOT HERE {:?}", cache);
cache.insert(pool).await?;
}
}
}

let mut inserted = 0;
for (k, v) in flist_cache_map {
if current_cache.contains_key(&k) {
continue;
}
v.insert(pool).await?;
inserted += 1;
}
Ok(inserted)
}

async fn load_file_list(&self, get_deleted: bool) -> Result<Vec<FileInfoCache>, Error> {
let session = self.get_servicesession();
let stype = self.get_servicetype();
Expand Down Expand Up @@ -439,49 +345,8 @@ impl FileListTrait for FileList {
&self.pool
}

fn get_filemap(&self) -> &HashMap<StackString, FileInfo> {
&self.filemap
}

fn get_min_mtime(&self) -> Option<u32> {
self.min_mtime
}

fn get_max_mtime(&self) -> Option<u32> {
self.max_mtime
}

fn with_list(&mut self, filelist: Vec<FileInfo>) {
let mut min_mtime: Option<u32> = None;
let mut max_mtime: Option<u32> = None;
let filemap = filelist
.into_iter()
.map(|f| {
let path = f.filepath.to_string_lossy();
let key = remove_basepath(&path, &self.get_basepath().to_string_lossy());
let mut inner = f.inner().clone();
inner.servicesession = self.get_servicesession().clone();
if min_mtime.is_none() || min_mtime > Some(inner.filestat.st_mtime) {
min_mtime.replace(inner.filestat.st_mtime);
}
if max_mtime.is_none() || max_mtime < Some(inner.filestat.st_mtime) {
max_mtime.replace(inner.filestat.st_mtime);
}
let f = FileInfo::from_inner(inner);
(key, f)
})
.collect();
self.filemap = Arc::new(filemap);
self.min_mtime = min_mtime;
self.max_mtime = max_mtime;
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error> {
self.load_file_list(false)
.await?
.into_iter()
.map(TryInto::try_into)
.collect()
async fn update_file_cache(&self) -> Result<usize, Error> {
Ok(0)
}
}

Expand Down
74 changes: 39 additions & 35 deletions sync_app_lib/src/file_list_gcs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use anyhow::{format_err, Error};
use async_trait::async_trait;
use checksums::{hash_file, Algorithm};
use log::info;
use futures::TryStreamExt;
use log::{debug, info};
use stack_string::{format_sstr, StackString};
use std::{
collections::HashMap,
Expand All @@ -15,10 +16,11 @@ use gdrive_lib::gcs_instance::GcsInstance;

use crate::{
config::Config,
file_info::{FileInfo, FileInfoTrait, ServiceSession},
file_info::{FileInfoTrait, ServiceSession},
file_info_gcs::FileInfoGcs,
file_list::{FileList, FileListTrait},
file_service::FileService,
models::FileInfoCache,
pgpool::PgPool,
};

Expand All @@ -42,7 +44,6 @@ impl FileListGcs {
config.clone(),
FileService::GCS,
bucket.parse()?,
HashMap::new(),
pool.clone(),
);
let gcs = GcsInstance::new(&config.gcs_token_path, &config.gcs_secret_file, bucket).await?;
Expand All @@ -62,7 +63,6 @@ impl FileListGcs {
config.clone(),
FileService::GCS,
bucket.parse()?,
HashMap::new(),
pool.clone(),
);
let config = config.clone();
Expand Down Expand Up @@ -101,35 +101,41 @@ impl FileListTrait for FileListGcs {
&self.flist.pool
}

fn get_filemap(&self) -> &HashMap<StackString, FileInfo> {
self.flist.get_filemap()
}

fn get_min_mtime(&self) -> Option<u32> {
self.flist.get_min_mtime()
}

fn get_max_mtime(&self) -> Option<u32> {
self.flist.get_max_mtime()
}

fn with_list(&mut self, filelist: Vec<FileInfo>) {
self.flist.with_list(filelist);
}

async fn fill_file_list(&self) -> Result<Vec<FileInfo>, Error> {
async fn update_file_cache(&self) -> Result<usize, Error> {
let bucket = self
.get_baseurl()
.host_str()
.ok_or_else(|| format_err!("Parse error"))?;
let prefix = self.get_baseurl().path().trim_start_matches('/');
let mut number_updated = 0;

let pool = self.get_pool();
let cached_urls: HashMap<StackString, _> = FileInfoCache::get_all_cached(
self.get_servicesession().as_str(),
self.get_servicetype().to_str(),
pool,
false,
)
.await?
.map_ok(|f| (f.urlname.clone(), f))
.try_collect()
.await?;
debug!("expected {}", cached_urls.len());

self.gcs
.get_list_of_keys(bucket, Some(prefix))
.await?
.into_iter()
.map(|f| FileInfoGcs::from_object(bucket, f).map(FileInfoTrait::into_finfo))
.collect()
for object in self.gcs.get_list_of_keys(bucket, Some(prefix)).await? {
let info: FileInfoCache = FileInfoGcs::from_object(bucket, object)?
.into_finfo()
.into();
if let Some(existing) = cached_urls.get(&info.urlname) {
if existing.deleted_at.is_none()
&& existing.filestat_st_size == info.filestat_st_size
{
continue;
}
}
number_updated += info.upsert(pool).await?;
}
Ok(number_updated)
}

async fn print_list(&self, stdout: &StdoutChannel<StackString>) -> Result<(), Error> {
Expand Down Expand Up @@ -286,20 +292,18 @@ mod tests {
.and_then(|b| b.name.clone())
.unwrap_or_else(|| "".to_string());

let mut flist = FileListGcs::new(&bucket, &config, &pool).await?;
let flist = FileListGcs::new(&bucket, &config, &pool).await?;

let new_flist = flist.fill_file_list().await?;

info!("{} {:?}", bucket, new_flist.get(0));
assert!(new_flist.len() > 0);
flist.clear_file_list().await?;

flist.with_list(new_flist);
let number_updated = flist.update_file_cache().await?;

flist.cache_file_list().await?;
info!("{} {}", bucket, number_updated);
assert!(number_updated > 0);

let new_flist = flist.load_file_list(false).await?;

assert_eq!(flist.flist.get_filemap().len(), new_flist.len());
assert_eq!(number_updated, new_flist.len());

flist.clear_file_list().await?;
Ok(())
Expand Down
Loading
Loading