Skip to content

Commit 692bce8

Browse files
committed
libsql: Fix WAL pull logic on last frame
The WAL sync protocol only allows you to fetch frames for a range. While the protocol does not have mechanism to determine what is the latest frame on the server for pull, it does signal with HTTP status code 400 when you attempt to read a frame that does not exists. Let's use this signal to fix WAL pull logic not to fail always. In the future, we might want to consider extending the protocol to allow clients to proble for latest frame. That, however, will make the protocol a bit more chatty, so I am trying to avoid that as long as I can.
1 parent 3f5ccae commit 692bce8

File tree

2 files changed

+17
-7
lines changed

2 files changed

+17
-7
lines changed

libsql/src/local/database.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -476,10 +476,13 @@ impl Database {
476476

477477
loop {
478478
match sync_ctx.pull_one_frame(generation, frame_no).await {
479-
Ok(frame) => {
479+
Ok(Some(frame)) => {
480480
conn.wal_insert_frame(&frame)?;
481481
frame_no += 1;
482482
}
483+
Ok(None) => {
484+
break;
485+
}
483486
Err(e) => {
484487
tracing::debug!("pull_one_frame error: {:?}", e);
485488
err.replace(e);

libsql/src/sync.rs

+13-6
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl SyncContext {
107107
}
108108

109109
#[tracing::instrument(skip(self))]
110-
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Bytes> {
110+
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Option<Bytes>> {
111111
let uri = format!(
112112
"{}/sync/{}/{}/{}",
113113
self.sync_url,
@@ -116,9 +116,13 @@ impl SyncContext {
116116
frame_no + 1
117117
);
118118
tracing::debug!("pulling frame");
119-
let frame = self.pull_with_retry(uri, self.max_retries).await?;
120-
self.durable_frame_num = frame_no;
121-
Ok(frame)
119+
match self.pull_with_retry(uri, self.max_retries).await? {
120+
Some(frame) => {
121+
self.durable_frame_num = frame_no;
122+
Ok(Some(frame))
123+
}
124+
None => Ok(None),
125+
}
122126
}
123127

124128
#[tracing::instrument(skip(self, frame))]
@@ -232,7 +236,7 @@ impl SyncContext {
232236
}
233237
}
234238

235-
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Bytes> {
239+
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
236240
let mut nr_retries = 0;
237241
loop {
238242
let mut req = http::Request::builder().method("GET").uri(uri.clone());
@@ -256,7 +260,10 @@ impl SyncContext {
256260
let frame = hyper::body::to_bytes(res.into_body())
257261
.await
258262
.map_err(SyncError::HttpBody)?;
259-
return Ok(frame);
263+
return Ok(Some(frame));
264+
}
265+
if res.status() == StatusCode::BAD_REQUEST {
266+
return Ok(None);
260267
}
261268
// If we've retried too many times or the error is not a server error,
262269
// return the error.

0 commit comments

Comments
 (0)