From a0cdef90cf86cd8d2cc89723f5751c1123ae7e2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Bj=C3=A4reholt?= Date: Thu, 17 Oct 2024 19:24:23 +0200 Subject: [PATCH] fix(sync): added `client.wait_for_start()` and improve error handling in sync daemon (#495) * fix(sync): improve error handling in daemon and add exponential backoff for server connection - Added error logging in daemon before re-throwing errors - Implemented exponential backoff when checking if the server is running - Addresses issue https://github.com/ActivityWatch/aw-qt/issues/105 * Apply suggestions from code review * refactor: move wait_for_server() into client * refactor: renamed wait_for_server to wait_for_start (same as in aw-client-python) --- aw-client-rust/src/blocking.rs | 4 ++++ aw-client-rust/src/lib.rs | 36 ++++++++++++++++++++++++++++++++++ aw-sync/src/main.rs | 21 ++++++++++++++------ aw-sync/src/sync_wrapper.rs | 11 +---------- 4 files changed, 56 insertions(+), 16 deletions(-) diff --git a/aw-client-rust/src/blocking.rs b/aw-client-rust/src/blocking.rs index 4e765fac..ee7822bc 100644 --- a/aw-client-rust/src/blocking.rs +++ b/aw-client-rust/src/blocking.rs @@ -79,4 +79,8 @@ impl AwClient { proxy_method!(delete_event, (), bucketname: &str, event_id: i64); proxy_method!(get_event_count, i64, bucketname: &str); proxy_method!(get_info, aw_models::Info,); + + pub fn wait_for_start(&self) -> Result<(), Box> { + self.client.wait_for_start() + } } diff --git a/aw-client-rust/src/lib.rs b/aw-client-rust/src/lib.rs index 624bad8d..f57d1422 100644 --- a/aw-client-rust/src/lib.rs +++ b/aw-client-rust/src/lib.rs @@ -11,6 +11,8 @@ use std::{collections::HashMap, error::Error}; use chrono::{DateTime, Utc}; use serde_json::{json, Map}; +use std::net::TcpStream; +use std::time::Duration; pub use aw_models::{Bucket, BucketMetadata, Event}; @@ -221,4 +223,38 @@ impl AwClient { let url = format!("{}/api/0/info", self.baseurl); self.client.get(url).send().await?.json().await } + + // TODO: make async + pub fn wait_for_start(&self) -> Result<(), Box> { + let socket_addrs = self.baseurl.socket_addrs(|| None)?; + let socket_addr = socket_addrs + .first() + .ok_or("Unable to resolve baseurl into socket address")?; + + // Check if server is running with exponential backoff + let mut retry_delay = Duration::from_millis(100); + let max_wait = Duration::from_secs(10); + let mut total_wait = Duration::from_secs(0); + + while total_wait < max_wait { + match TcpStream::connect_timeout(socket_addr, retry_delay) { + Ok(_) => break, + Err(_) => { + std::thread::sleep(retry_delay); + total_wait += retry_delay; + retry_delay *= 2; + } + } + } + + if total_wait >= max_wait { + return Err(format!( + "Local server {} not running after 10 seconds of retrying", + socket_addr + ) + .into()); + } + + Ok(()) + } } diff --git a/aw-sync/src/main.rs b/aw-sync/src/main.rs index 46c2bb41..243bff07 100644 --- a/aw-sync/src/main.rs +++ b/aw-sync/src/main.rs @@ -208,14 +208,23 @@ fn main() -> Result<(), Box> { fn daemon(client: &AwClient) -> Result<(), Box> { loop { - info!("Pulling from all hosts"); - sync_wrapper::pull_all(client)?; - - info!("Pushing local data"); - sync_wrapper::push(client)?; + if let Err(e) = daemon_sync_cycle(client) { + error!("Error during sync cycle: {}", e); + // Re-throw the error + return Err(e); + } info!("Sync pass done, sleeping for 5 minutes"); - std::thread::sleep(std::time::Duration::from_secs(300)); } } + +fn daemon_sync_cycle(client: &AwClient) -> Result<(), Box> { + info!("Pulling from all hosts"); + sync_wrapper::pull_all(client)?; + + info!("Pushing local data"); + sync_wrapper::push(client)?; + + Ok(()) +} diff --git a/aw-sync/src/sync_wrapper.rs b/aw-sync/src/sync_wrapper.rs index 73dbe953..13007779 100644 --- a/aw-sync/src/sync_wrapper.rs +++ b/aw-sync/src/sync_wrapper.rs @@ -1,6 +1,5 @@ use std::error::Error; use std::fs; -use std::net::TcpStream; use crate::sync::{sync_run, SyncMode, SyncSpec}; use aw_client_rust::blocking::AwClient; @@ -14,15 +13,7 @@ pub fn pull_all(client: &AwClient) -> Result<(), Box> { } pub fn pull(host: &str, client: &AwClient) -> Result<(), Box> { - let socket_addrs = client.baseurl.socket_addrs(|| None)?; - let socket_addr = socket_addrs - .get(0) - .ok_or("Unable to resolve baseurl into socket address")?; - - // Check if server is running - if TcpStream::connect(socket_addr).is_err() { - return Err(format!("Local server {} not running", &client.baseurl).into()); - } + client.wait_for_start()?; // Path to the sync folder // Sync folder is structured ./{hostname}/{device_id}/test.db