Skip to content

Commit 1f79fbc

Browse files
authored
Merge 'Partial sync basic' from Nikita Sivukhin
This PR implements basic support for partial sync. Right now the scope is limited to only `:memory:` IO and later will be properly expanded to the file based IO later. The main addition is `PartialDatabaseStorage` which make request to the remote server for missing local pages on demand. The main change is that now tursodatabase JS bindings accept optional "external" IO event loop which in case of sync will drive `ProtocolIo` internal work associated with remote page fetching tasks. Closes #3931
2 parents 7c96b6d + 740ff2b commit 1f79fbc

29 files changed

+1226
-200
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bindings/javascript/packages/common/promise.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,15 @@ class Database {
3737
inTransaction: boolean;
3838

3939
private db: NativeDatabase;
40+
private ioStep: () => Promise<void>;
4041
private execLock: AsyncLock;
4142
private _inTransaction: boolean = false;
4243
protected connected: boolean = false;
4344

44-
constructor(db: NativeDatabase) {
45+
constructor(db: NativeDatabase, ioStep?: () => Promise<void>) {
4546
this.db = db;
4647
this.execLock = new AsyncLock();
48+
this.ioStep = ioStep ?? (async () => { });
4749
Object.defineProperties(this, {
4850
name: { get: () => this.db.path },
4951
readonly: { get: () => this.db.readonly },
@@ -74,9 +76,9 @@ class Database {
7476

7577
try {
7678
if (this.connected) {
77-
return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock);
79+
return new Statement(maybeValue(this.db.prepare(sql)), this.db, this.execLock, this.ioStep);
7880
} else {
79-
return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock)
81+
return new Statement(maybePromise(() => this.connect().then(() => this.db.prepare(sql))), this.db, this.execLock, this.ioStep)
8082
}
8183
} catch (err) {
8284
throw convertError(err);
@@ -185,6 +187,7 @@ class Database {
185187
const stepResult = exec.stepSync();
186188
if (stepResult === STEP_IO) {
187189
await this.db.ioLoopAsync();
190+
await this.ioStep();
188191
continue;
189192
}
190193
if (stepResult === STEP_DONE) {
@@ -280,11 +283,13 @@ class Statement {
280283
private stmt: MaybeLazy<NativeStatement>;
281284
private db: NativeDatabase;
282285
private execLock: AsyncLock;
286+
private ioStep: () => Promise<void>;
283287

284-
constructor(stmt: MaybeLazy<NativeStatement>, db: NativeDatabase, execLock: AsyncLock) {
288+
constructor(stmt: MaybeLazy<NativeStatement>, db: NativeDatabase, execLock: AsyncLock, ioStep: () => Promise<void>) {
285289
this.stmt = stmt;
286290
this.db = db;
287291
this.execLock = execLock;
292+
this.ioStep = ioStep;
288293
}
289294

290295
/**
@@ -352,7 +357,7 @@ class Statement {
352357
while (true) {
353358
const stepResult = await stmt.stepSync();
354359
if (stepResult === STEP_IO) {
355-
await this.db.ioLoopAsync();
360+
await this.io();
356361
continue;
357362
}
358363
if (stepResult === STEP_DONE) {
@@ -389,7 +394,7 @@ class Statement {
389394
while (true) {
390395
const stepResult = await stmt.stepSync();
391396
if (stepResult === STEP_IO) {
392-
await this.db.ioLoopAsync();
397+
await this.io();
393398
continue;
394399
}
395400
if (stepResult === STEP_DONE) {
@@ -453,7 +458,7 @@ class Statement {
453458
while (true) {
454459
const stepResult = await stmt.stepSync();
455460
if (stepResult === STEP_IO) {
456-
await this.db.ioLoopAsync();
461+
await this.io();
457462
continue;
458463
}
459464
if (stepResult === STEP_DONE) {
@@ -471,6 +476,11 @@ class Statement {
471476
}
472477
}
473478

479+
async io() {
480+
await this.db.ioLoopAsync();
481+
await this.ioStep();
482+
}
483+
474484
/**
475485
* Interrupts the statement.
476486
*/

bindings/javascript/sync/packages/common/index.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
1-
import { run, memoryIO, SyncEngineGuards } from "./run.js"
2-
import { DatabaseOpts, ProtocolIo, RunOpts, DatabaseRowMutation, DatabaseRowStatement, DatabaseRowTransformResult, DatabaseStats, DatabaseChangeType, EncryptionOpts } from "./types.js"
1+
import { run, memoryIO, runner, SyncEngineGuards, Runner } from "./run.js"
2+
import {
3+
DatabaseOpts,
4+
ProtocolIo,
5+
RunOpts,
6+
DatabaseRowMutation,
7+
DatabaseRowStatement,
8+
DatabaseRowTransformResult,
9+
DatabaseStats,
10+
DatabaseChangeType,
11+
EncryptionOpts,
12+
} from "./types.js"
313

4-
export { run, memoryIO, SyncEngineGuards }
14+
export { run, memoryIO, runner, SyncEngineGuards, Runner }
515
export type {
616
DatabaseStats,
717
DatabaseOpts,

bindings/javascript/sync/packages/common/run.ts

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,28 @@ export function memoryIO(): ProtocolIo {
116116
}
117117
};
118118

119+
export interface Runner {
120+
wait(): Promise<void>;
121+
}
119122

120-
export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator: any): Promise<any> {
123+
export function runner(opts: RunOpts, io: ProtocolIo, engine: any): Runner {
121124
let tasks = [];
125+
return {
126+
async wait() {
127+
for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) {
128+
tasks.push(trackPromise(process(opts, io, request)));
129+
}
130+
const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]);
131+
await Promise.all([engine.ioLoopAsync(), tasksRace]);
132+
133+
tasks = tasks.filter(t => !t.finished);
134+
135+
engine.protocolIoStep();
136+
},
137+
}
138+
}
139+
140+
export async function run(runner: Runner, generator: any): Promise<any> {
122141
while (true) {
123142
const { type, ...rest }: GeneratorResponse = await generator.resumeAsync(null);
124143
if (type == 'Done') {
@@ -131,14 +150,7 @@ export async function run(opts: RunOpts, io: ProtocolIo, engine: any, generator:
131150
//@ts-ignore
132151
return rest.changes;
133152
}
134-
for (let request = engine.protocolIo(); request != null; request = engine.protocolIo()) {
135-
tasks.push(trackPromise(process(opts, io, request)));
136-
}
137-
138-
const tasksRace = tasks.length == 0 ? Promise.resolve() : Promise.race([timeoutMs(opts.preemptionMs), ...tasks.map(t => t.promise)]);
139-
await Promise.all([engine.ioLoopAsync(), tasksRace]);
140-
141-
tasks = tasks.filter(t => !t.finished);
153+
await runner.wait();
142154
}
143155
}
144156

bindings/javascript/sync/packages/common/types.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ export interface DatabaseOpts {
107107
* optional parameter to enable internal logging for the database
108108
*/
109109
tracing?: 'error' | 'warn' | 'info' | 'debug' | 'trace',
110+
/**
111+
* optional parameter to enable partial sync for the database
112+
*/
113+
partialBootstrapStrategy?: { kind: 'prefix', length: number } | { kind: 'query', query: string };
110114
}
111115
export interface DatabaseStats {
112116
/**
@@ -134,6 +138,14 @@ export interface DatabaseStats {
134138
* (can be used as e-tag, but string must not be interpreted in any way and must be used as opaque value)
135139
*/
136140
revision: string | null;
141+
/**
142+
* total amount of sent bytes over the network
143+
*/
144+
networkSentBytes: number;
145+
/**
146+
* total amount of received bytes over the network
147+
*/
148+
networkReceivedBytes: number;
137149
}
138150

139151
/* internal types used in the native/browser packages */

bindings/javascript/sync/packages/native/index.d.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ export declare class JsDataCompletion {
163163
}
164164

165165
export declare class JsProtocolIo {
166-
takeRequest(): JsProtocolRequestBytes | null
166+
167167
}
168168

169169
export declare class JsProtocolRequestBytes {
@@ -178,6 +178,7 @@ export declare class SyncEngine {
178178
/** Runs the I/O loop asynchronously, returning a Promise. */
179179
ioLoopAsync(): Promise<void>
180180
protocolIo(): JsProtocolRequestBytes | null
181+
protocolIoStep(): void
181182
push(): GeneratorHolder
182183
stats(): GeneratorHolder
183184
wait(): GeneratorHolder
@@ -220,9 +221,13 @@ export type DatabaseRowTransformResultJs =
220221
export type GeneratorResponse =
221222
| { type: 'IO' }
222223
| { type: 'Done' }
223-
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string }
224+
| { type: 'SyncEngineStats', operations: number, mainWal: number, revertWal: number, lastPullUnixTime?: number, lastPushUnixTime?: number, revision?: string, networkSentBytes: number, networkReceivedBytes: number }
224225
| { type: 'SyncEngineChanges', changes: SyncEngineChanges }
225226

227+
export type JsPartialBootstrapStrategy =
228+
| { type: 'Prefix', length: number }
229+
| { type: 'Query', query: string }
230+
226231
export type JsProtocolRequest =
227232
| { type: 'Http', method: string, path: string, body?: Array<number>, headers: Array<[string, string]> }
228233
| { type: 'FullRead', path: string }
@@ -240,6 +245,7 @@ export interface SyncEngineOpts {
240245
protocolVersion?: SyncEngineProtocolVersion
241246
bootstrapIfEmpty: boolean
242247
remoteEncryption?: string
248+
partialBoostrapStrategy?: JsPartialBootstrapStrategy
243249
}
244250

245251
export declare const enum SyncEngineProtocolVersion {

0 commit comments

Comments
 (0)