Skip to content

Commit

Permalink
Merge pull request #1877 from tursodatabase/fix-wal-pull
Browse files Browse the repository at this point in the history
libsql: Fix WAL pull logic on last frame
  • Loading branch information
penberg authored Dec 12, 2024
2 parents 3f5ccae + 692bce8 commit 6ed1690
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 7 deletions.
5 changes: 4 additions & 1 deletion libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,13 @@ impl Database {

loop {
match sync_ctx.pull_one_frame(generation, frame_no).await {
Ok(frame) => {
Ok(Some(frame)) => {
conn.wal_insert_frame(&frame)?;
frame_no += 1;
}
Ok(None) => {
break;
}
Err(e) => {
tracing::debug!("pull_one_frame error: {:?}", e);
err.replace(e);
Expand Down
19 changes: 13 additions & 6 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SyncContext {
}

#[tracing::instrument(skip(self))]
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Bytes> {
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Option<Bytes>> {
let uri = format!(
"{}/sync/{}/{}/{}",
self.sync_url,
Expand All @@ -116,9 +116,13 @@ impl SyncContext {
frame_no + 1
);
tracing::debug!("pulling frame");
let frame = self.pull_with_retry(uri, self.max_retries).await?;
self.durable_frame_num = frame_no;
Ok(frame)
match self.pull_with_retry(uri, self.max_retries).await? {
Some(frame) => {
self.durable_frame_num = frame_no;
Ok(Some(frame))
}
None => Ok(None),
}
}

#[tracing::instrument(skip(self, frame))]
Expand Down Expand Up @@ -232,7 +236,7 @@ impl SyncContext {
}
}

async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Bytes> {
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
let mut nr_retries = 0;
loop {
let mut req = http::Request::builder().method("GET").uri(uri.clone());
Expand All @@ -256,7 +260,10 @@ impl SyncContext {
let frame = hyper::body::to_bytes(res.into_body())
.await
.map_err(SyncError::HttpBody)?;
return Ok(frame);
return Ok(Some(frame));
}
if res.status() == StatusCode::BAD_REQUEST {
return Ok(None);
}
// If we've retried too many times or the error is not a server error,
// return the error.
Expand Down

0 comments on commit 6ed1690

Please sign in to comment.