@@ -20,6 +20,7 @@ const METADATA_VERSION: u32 = 0;
20
20
21
21
const DEFAULT_MAX_RETRIES : usize = 5 ;
22
22
const DEFAULT_PUSH_BATCH_SIZE : u32 = 128 ;
23
+ const DEFAULT_PULL_BATCH_SIZE : u32 = 128 ;
23
24
24
25
#[ derive( thiserror:: Error , Debug ) ]
25
26
#[ non_exhaustive]
@@ -66,6 +67,8 @@ pub enum SyncError {
66
67
InvalidLocalGeneration ( u32 , u32 ) ,
67
68
#[ error( "invalid local state: {0}" ) ]
68
69
InvalidLocalState ( String ) ,
70
+ #[ error( "server returned invalid length of frames: {0}" ) ]
71
+ InvalidPullFrameBytes ( usize ) ,
69
72
}
70
73
71
74
impl SyncError {
@@ -98,8 +101,8 @@ pub enum PushStatus {
98
101
}
99
102
100
103
pub enum PullResult {
101
- /// A frame was successfully pulled.
102
- Frame ( Bytes ) ,
104
+ /// Frames were successfully pulled.
105
+ Frames ( Bytes ) ,
103
106
/// We've reached the end of the generation.
104
107
EndOfGeneration { max_generation : u32 } ,
105
108
}
@@ -122,6 +125,7 @@ pub struct SyncContext {
122
125
auth_token : Option < HeaderValue > ,
123
126
max_retries : usize ,
124
127
push_batch_size : u32 ,
128
+ pull_batch_size : u32 ,
125
129
/// The current durable generation.
126
130
durable_generation : u32 ,
127
131
/// Represents the max_frame_no from the server.
@@ -154,6 +158,7 @@ impl SyncContext {
154
158
auth_token,
155
159
max_retries : DEFAULT_MAX_RETRIES ,
156
160
push_batch_size : DEFAULT_PUSH_BATCH_SIZE ,
161
+ pull_batch_size : DEFAULT_PULL_BATCH_SIZE ,
157
162
client,
158
163
durable_generation : 0 ,
159
164
durable_frame_num : 0 ,
@@ -175,7 +180,7 @@ impl SyncContext {
175
180
}
176
181
177
182
#[ tracing:: instrument( skip( self ) ) ]
178
- pub ( crate ) async fn pull_one_frame (
183
+ pub ( crate ) async fn pull_frames (
179
184
& mut self ,
180
185
generation : u32 ,
181
186
frame_no : u32 ,
@@ -185,9 +190,11 @@ impl SyncContext {
185
190
self . sync_url,
186
191
generation,
187
192
frame_no,
188
- frame_no + 1
193
+ // the server expects the range of [start, end) frames, i.e. end is exclusive
194
+ // so we add +1 so that we can pull in the batch size of `self.pull_batch_size`
195
+ frame_no + self . pull_batch_size
189
196
) ;
190
- tracing:: debug!( "pulling frame" ) ;
197
+ tracing:: debug!( "pulling frame (uri={})" , uri ) ;
191
198
self . pull_with_retry ( uri, self . max_retries ) . await
192
199
}
193
200
@@ -420,7 +427,7 @@ impl SyncContext {
420
427
let frame = hyper:: body:: to_bytes ( res. into_body ( ) )
421
428
. await
422
429
. map_err ( SyncError :: HttpBody ) ?;
423
- return Ok ( PullResult :: Frame ( frame) ) ;
430
+ return Ok ( PullResult :: Frames ( frame) ) ;
424
431
}
425
432
// BUG ALERT: The server returns a 500 error if the remote database is empty.
426
433
// This is a bug and should be fixed.
@@ -887,6 +894,11 @@ async fn try_push(
887
894
} )
888
895
}
889
896
897
+ /// PAGE_SIZE used by the sync / diskless server
898
+ const PAGE_SIZE : usize = 4096 ;
899
+ const FRAME_HEADER_SIZE : usize = 24 ;
900
+ const FRAME_SIZE : usize = PAGE_SIZE + FRAME_HEADER_SIZE ;
901
+
890
902
pub async fn try_pull (
891
903
sync_ctx : & mut SyncContext ,
892
904
conn : & Connection ,
@@ -898,10 +910,27 @@ pub async fn try_pull(
898
910
loop {
899
911
let generation = sync_ctx. durable_generation ( ) ;
900
912
let frame_no = sync_ctx. durable_frame_num ( ) + 1 ;
901
- match sync_ctx. pull_one_frame ( generation, frame_no) . await {
902
- Ok ( PullResult :: Frame ( frame) ) => {
903
- insert_handle. insert ( & frame) ?;
904
- sync_ctx. durable_frame_num = frame_no;
913
+ match sync_ctx. pull_frames ( generation, frame_no) . await {
914
+ Ok ( PullResult :: Frames ( frames) ) => {
915
+ tracing:: trace!(
916
+ "pull_frames: generation={}, start_frame_no={} (batch_size={}), frame_size={}" ,
917
+ generation,
918
+ frame_no,
919
+ sync_ctx. pull_batch_size,
920
+ frames. len( ) ,
921
+ ) ;
922
+ if frames. len ( ) % FRAME_SIZE != 0 {
923
+ tracing:: error!(
924
+ "frame size {} is not a multiple of the expected size {}" ,
925
+ frames. len( ) ,
926
+ FRAME_SIZE ,
927
+ ) ;
928
+ return Err ( SyncError :: InvalidPullFrameBytes ( frames. len ( ) ) . into ( ) ) ;
929
+ }
930
+ for chunk in frames. chunks ( FRAME_SIZE ) {
931
+ insert_handle. insert ( & chunk) ?;
932
+ sync_ctx. durable_frame_num += 1 ;
933
+ }
905
934
}
906
935
Ok ( PullResult :: EndOfGeneration { max_generation } ) => {
907
936
// If there are no more generations to pull, we're done.
@@ -920,7 +949,7 @@ pub async fn try_pull(
920
949
insert_handle. begin ( ) ?;
921
950
}
922
951
Err ( e) => {
923
- tracing:: debug!( "pull_one_frame error: {:?}" , e) ;
952
+ tracing:: debug!( "pull_frames error: {:?}" , e) ;
924
953
err. replace ( e) ;
925
954
break ;
926
955
}
0 commit comments