diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 8f836daef3..72b4c5d0e5 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -476,10 +476,13 @@ impl Database { loop { match sync_ctx.pull_one_frame(generation, frame_no).await { - Ok(frame) => { + Ok(Some(frame)) => { conn.wal_insert_frame(&frame)?; frame_no += 1; } + Ok(None) => { + break; + } Err(e) => { tracing::debug!("pull_one_frame error: {:?}", e); err.replace(e); diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index f876295f95..ed625f7b68 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -107,7 +107,7 @@ impl SyncContext { } #[tracing::instrument(skip(self))] - pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result { + pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result> { let uri = format!( "{}/sync/{}/{}/{}", self.sync_url, @@ -116,9 +116,13 @@ impl SyncContext { frame_no + 1 ); tracing::debug!("pulling frame"); - let frame = self.pull_with_retry(uri, self.max_retries).await?; - self.durable_frame_num = frame_no; - Ok(frame) + match self.pull_with_retry(uri, self.max_retries).await? { + Some(frame) => { + self.durable_frame_num = frame_no; + Ok(Some(frame)) + } + None => Ok(None), + } } #[tracing::instrument(skip(self, frame))] @@ -232,7 +236,7 @@ impl SyncContext { } } - async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result { + async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result> { let mut nr_retries = 0; loop { let mut req = http::Request::builder().method("GET").uri(uri.clone()); @@ -256,7 +260,10 @@ impl SyncContext { let frame = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - return Ok(frame); + return Ok(Some(frame)); + } + if res.status() == StatusCode::BAD_REQUEST { + return Ok(None); } // If we've retried too many times or the error is not a server error, // return the error.