Skip to content

Commit aaf8ec4

Browse files
committed
Optimise sync_pull to pull frames in batches
Previously, we pulled frames one by one. This patch changes it pull frames in batches. Currently, the batch size is set to 128 (the maximum supported by the server)
1 parent 4ef5e81 commit aaf8ec4

File tree

2 files changed

+90
-19
lines changed

2 files changed

+90
-19
lines changed

libsql/src/replication/remote_client.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use tonic::metadata::AsciiMetadataValue;
1818
use tonic::{Response, Status};
1919
use zerocopy::FromBytes;
2020

21-
async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
21+
pub async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
2222
let before = Instant::now();
2323
let out = fut.await;
2424
(out, before.elapsed())

libsql/src/sync.rs

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use crate::{local::Connection, util::ConnectorService, Error, Result};
22

33
use std::path::Path;
44

5+
use crate::replication::remote_client::time;
56
use bytes::Bytes;
67
use chrono::Utc;
78
use http::{HeaderValue, StatusCode};
@@ -20,6 +21,7 @@ const METADATA_VERSION: u32 = 0;
2021

2122
const DEFAULT_MAX_RETRIES: usize = 5;
2223
const DEFAULT_PUSH_BATCH_SIZE: u32 = 128;
24+
const DEFAULT_PULL_BATCH_SIZE: u32 = 128;
2325

2426
#[derive(thiserror::Error, Debug)]
2527
#[non_exhaustive]
@@ -66,6 +68,8 @@ pub enum SyncError {
6668
InvalidLocalGeneration(u32, u32),
6769
#[error("invalid local state: {0}")]
6870
InvalidLocalState(String),
71+
#[error("server returned invalid length of frames: {0}")]
72+
InvalidPullFrameBytes(usize),
6973
}
7074

7175
impl SyncError {
@@ -98,8 +102,8 @@ pub enum PushStatus {
98102
}
99103

100104
pub enum PullResult {
101-
/// A frame was successfully pulled.
102-
Frame(Bytes),
105+
/// Frames were successfully pulled.
106+
Frames(Bytes),
103107
/// We've reached the end of the generation.
104108
EndOfGeneration { max_generation: u32 },
105109
}
@@ -122,6 +126,7 @@ pub struct SyncContext {
122126
auth_token: Option<HeaderValue>,
123127
max_retries: usize,
124128
push_batch_size: u32,
129+
pull_batch_size: u32,
125130
/// The current durable generation.
126131
durable_generation: u32,
127132
/// Represents the max_frame_no from the server.
@@ -154,6 +159,7 @@ impl SyncContext {
154159
auth_token,
155160
max_retries: DEFAULT_MAX_RETRIES,
156161
push_batch_size: DEFAULT_PUSH_BATCH_SIZE,
162+
pull_batch_size: DEFAULT_PULL_BATCH_SIZE,
157163
client,
158164
durable_generation: 0,
159165
durable_frame_num: 0,
@@ -175,7 +181,7 @@ impl SyncContext {
175181
}
176182

177183
#[tracing::instrument(skip(self))]
178-
pub(crate) async fn pull_one_frame(
184+
pub(crate) async fn pull_frames(
179185
&mut self,
180186
generation: u32,
181187
frame_no: u32,
@@ -185,9 +191,10 @@ impl SyncContext {
185191
self.sync_url,
186192
generation,
187193
frame_no,
188-
frame_no + 1
194+
// the server expects the range of [start, end) frames, i.e. end is exclusive
195+
frame_no + self.pull_batch_size
189196
);
190-
tracing::debug!("pulling frame");
197+
tracing::debug!("pulling frame (uri={})", uri);
191198
self.pull_with_retry(uri, self.max_retries).await
192199
}
193200

@@ -417,20 +424,39 @@ impl SyncContext {
417424
.map_err(SyncError::HttpDispatch)?;
418425

419426
if res.status().is_success() {
420-
let frame = hyper::body::to_bytes(res.into_body())
427+
let frames = hyper::body::to_bytes(res.into_body())
421428
.await
422429
.map_err(SyncError::HttpBody)?;
423-
return Ok(PullResult::Frame(frame));
430+
// a success result should always return some frames
431+
if frames.is_empty() {
432+
tracing::error!("server returned empty frames in pull response");
433+
return Err(SyncError::InvalidPullFrameBytes(0).into());
434+
}
435+
// the minimum payload size cannot be less than a single frame
436+
if frames.len() < FRAME_SIZE {
437+
tracing::error!(
438+
"server returned frames with invalid length: {} < {}",
439+
frames.len(),
440+
FRAME_SIZE
441+
);
442+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
443+
}
444+
return Ok(PullResult::Frames(frames));
424445
}
425446
// BUG ALERT: The server returns a 500 error if the remote database is empty.
426447
// This is a bug and should be fixed.
427448
if res.status() == StatusCode::BAD_REQUEST
428449
|| res.status() == StatusCode::INTERNAL_SERVER_ERROR
429450
{
451+
let status = res.status();
430452
let res_body = hyper::body::to_bytes(res.into_body())
431453
.await
432454
.map_err(SyncError::HttpBody)?;
433-
455+
tracing::trace!(
456+
"server returned: {} body: {}",
457+
status,
458+
String::from_utf8_lossy(&res_body[..])
459+
);
434460
let resp = serde_json::from_slice::<serde_json::Value>(&res_body[..])
435461
.map_err(SyncError::JsonDecode)?;
436462

@@ -650,22 +676,33 @@ impl SyncContext {
650676

651677
let req = req.body(Body::empty()).expect("valid request");
652678

653-
let res = self
654-
.client
655-
.request(req)
656-
.await
657-
.map_err(SyncError::HttpDispatch)?;
679+
let (res, http_duration) = time(self.client.request(req)).await;
680+
let res = res.map_err(SyncError::HttpDispatch)?;
658681

659682
if !res.status().is_success() {
660683
let status = res.status();
661684
let body = hyper::body::to_bytes(res.into_body())
662685
.await
663686
.map_err(SyncError::HttpBody)?;
687+
tracing::error!(
688+
"failed to pull db file from remote server, status={}, body={}, url={}, duration={:?}",
689+
status,
690+
String::from_utf8_lossy(&body),
691+
uri,
692+
http_duration
693+
);
664694
return Err(
665695
SyncError::PullFrame(status, String::from_utf8_lossy(&body).to_string()).into(),
666696
);
667697
}
668698

699+
tracing::debug!(
700+
"pulled db file from remote server, status={}, url={}, duration={:?}",
701+
res.status(),
702+
uri,
703+
http_duration
704+
);
705+
669706
// todo: do streaming write to the disk
670707
let bytes = hyper::body::to_bytes(res.into_body())
671708
.await
@@ -887,6 +924,11 @@ async fn try_push(
887924
})
888925
}
889926

927+
/// PAGE_SIZE used by the sync / diskless server
928+
const PAGE_SIZE: usize = 4096;
929+
const FRAME_HEADER_SIZE: usize = 24;
930+
const FRAME_SIZE: usize = PAGE_SIZE + FRAME_HEADER_SIZE;
931+
890932
pub async fn try_pull(
891933
sync_ctx: &mut SyncContext,
892934
conn: &Connection,
@@ -898,10 +940,39 @@ pub async fn try_pull(
898940
loop {
899941
let generation = sync_ctx.durable_generation();
900942
let frame_no = sync_ctx.durable_frame_num() + 1;
901-
match sync_ctx.pull_one_frame(generation, frame_no).await {
902-
Ok(PullResult::Frame(frame)) => {
903-
insert_handle.insert(&frame)?;
904-
sync_ctx.durable_frame_num = frame_no;
943+
match sync_ctx.pull_frames(generation, frame_no).await {
944+
Ok(PullResult::Frames(frames)) => {
945+
tracing::debug!(
946+
"pull_frames: generation={}, start_frame_no={} (batch_size={}), frame_size={}",
947+
generation,
948+
frame_no,
949+
sync_ctx.pull_batch_size,
950+
frames.len(),
951+
);
952+
if frames.len() % FRAME_SIZE != 0 {
953+
tracing::error!(
954+
"frame size {} is not a multiple of the expected size {}",
955+
frames.len(),
956+
FRAME_SIZE,
957+
);
958+
return Err(SyncError::InvalidPullFrameBytes(frames.len()).into());
959+
}
960+
for chunk in frames.chunks(FRAME_SIZE) {
961+
tracing::debug!(
962+
"inserting frame (frame_no={})",
963+
sync_ctx.durable_frame_num + 1
964+
);
965+
let r = insert_handle.insert(&chunk);
966+
if let Err(e) = r {
967+
tracing::debug!(
968+
"insert error (frame= {}) : {:?}",
969+
sync_ctx.durable_frame_num + 1,
970+
e
971+
);
972+
return Err(e);
973+
}
974+
sync_ctx.durable_frame_num += 1;
975+
}
905976
}
906977
Ok(PullResult::EndOfGeneration { max_generation }) => {
907978
// If there are no more generations to pull, we're done.
@@ -920,7 +991,7 @@ pub async fn try_pull(
920991
insert_handle.begin()?;
921992
}
922993
Err(e) => {
923-
tracing::debug!("pull_one_frame error: {:?}", e);
994+
tracing::debug!("pull_frames error: {:?}", e);
924995
err.replace(e);
925996
break;
926997
}

0 commit comments

Comments
 (0)