Skip to content

Commit 5a73498

Browse files
Add core.remoteContiguousLength & remote-contiguous-length event (#743)
* wip * Apply prettier to remoteContiguousLength commits thus far * Add event listener to `remote contiguous length` test * Add test to showcase remote contig length only updates for fully contig To save messages being sent by all peers, the existing behavior of sending a `range` request in `broadcastRange()` only when fully contiguous is being kept. This means the `.remoteContiguousLength` and the `remote-contiguous-length` event only update / fire when the remote peer is fully contiguous and not when partially contiguous. * Rename `contig` var to `fullyContig` This emphasizes that the variable is true only when it is fully contiguous, ie having the full range of blocks. This is in contrast to being partially contiguous. * Add test to verify truncation updates `remoteContiguousLength` too * Update tests to include `length` arg in `remote-contiguous-length` event * Document `.remoteContiguousLength` in README.md Includes note that it is only updated when the peer thinks it is fully contiguous (aka non-sparse). * Document `remote-contiguous-length` event * Clarify that truncating doesnt update `.remoteContiguousLength` anywhere * Test that appends after truncating dont fire remote contig length event * Format remote contig length event after truncate test * Fix remoteContiguousLength hint to update on truncating Now allows appends after truncating to fire remote contig event. * Add test for persisting remoteContiguousLength between reloads * Parse remoteContiguousLength when parsing header from storage Fixes the `remote contiguous length - persists` test assuming storage support. * Use `truncate` & `download` events instead of timeouts in test * Ensure contiguous lengths default to 0 when parsing header Not likely to happen in practice, but just to be safe. * Update remoteContiguousLength only on range starting at `0` Peers always send `range` requests starting with `0` if they are contiguous. Partial ranges would imply they are not contiguous so shouldn't be able to sparsely update the `remoteContiguousLength`. Because there is no need check the `start` of the range & `core.updateRemoteContiguousLength()` already checks if the `length` is larger, the logic is simplified to only call `updateRemoteContiguousLength()` when updating the `_remoteContiguousLength` for the peer when the range isnt a drop. * Use `done()` to await downloads in remote contig tests * Remove `.catch(noop)` of `.updateRemoteContiguousLength()` Could mask underlying error and its assumed that storage errors at this point should not be caught. * Add test to verify remote contig length when peer truncates & appends Should test if a reorg will correctly update the remote contiguous length to match. * Skip updating `remoteContiguousLength` when remote is on older fork * Try/catch & pause if updating remote contig hint throws * Move `updateRemoteContiguousLength()` next to `updateContiguousLength()` * Remove try catch to let error bubble up from updating remote contig * Move remote length check into `onrange` to avoid promise allocation `core.updateRemoteContiguousLength()` is only called from `onrange` so moving the check doesn't allow unviable lengths through from elsewhere. * Change `info.hints` check style when parsing header Use ternary instead of a combination of optional chaining and null coalescing. * Fix formatting * must be same fork to update contig --------- Co-authored-by: Mathias Buus <[email protected]>
1 parent 55de1ea commit 5a73498

File tree

6 files changed

+270
-12
lines changed

6 files changed

+270
-12
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,13 @@ Populated after `ready` has been emitted. Will be `0` before the event.
594594

595595
#### `core.contiguousLength`
596596

597-
How many blocks are contiguously available starting from the first block of this core?
597+
How many blocks are contiguously available starting from the first block of this core.
598+
599+
Populated after `ready` has been emitted. Will be `0` before the event.
600+
601+
#### `core.remoteContiguousLength`
602+
603+
How many blocks are contiguously available starting from the first block of this core on any known remote. This is only updated when a remote thinks it is fully contiguous such that they have all known blocks.
598604

599605
Populated after `ready` has been emitted. Will be `0` before the event.
600606

@@ -708,6 +714,10 @@ Emitted when a block is uploaded to a peer.
708714

709715
Emitted when a block is downloaded from a peer.
710716

717+
#### `core.on('remote-contiguous-length', length)`
718+
719+
Emitted when the max known contiguous `length` from a remote, ie `core.remoteContiguousLength`, is updated. Note this is not emitted when core is truncated.
720+
711721
#### `Hypercore.MAX_SUGGESTED_BLOCK_SIZE`
712722

713723
The constant for max size (15MB) for blocks appended to Hypercore. This max ensures blocks are replicated smoothly.

index.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,11 @@ class Hypercore extends EventEmitter {
564564
return this.state.byteLength - this.state.length * this.padding
565565
}
566566

567+
get remoteContiguousLength() {
568+
if (this.opened === false) return 0
569+
return Math.min(this.core.state.length, this.core.header.hints.remoteContiguousLength)
570+
}
571+
567572
get contiguousLength() {
568573
if (this.opened === false) return 0
569574
return Math.min(this.core.state.length, this.core.header.hints.contiguousLength)

lib/core.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,8 @@ module.exports = class Core {
200200
},
201201
hints: {
202202
reorgs: [],
203-
contiguousLength: 0
203+
contiguousLength: 0,
204+
remoteContiguousLength: 0
204205
}
205206
}
206207

@@ -284,7 +285,7 @@ module.exports = class Core {
284285
if (header.hints.contiguousLength !== len) {
285286
header.hints.contiguousLength = len
286287
const tx = storage.write()
287-
tx.setHints({ contiguousLength: len })
288+
tx.setHints(header.hints)
288289
await tx.flush()
289290
}
290291

@@ -718,6 +719,16 @@ module.exports = class Core {
718719
}
719720
}
720721

722+
async updateRemoteContiguousLength(length) {
723+
this.header.hints.remoteContiguousLength = length
724+
await this.state.flushHints()
725+
if (this.header.hints.remoteContiguousLength !== length) return
726+
727+
for (let i = this.monitors.length - 1; i >= 0; i--) {
728+
this.monitors[i].emit('remote-contiguous-length', length)
729+
}
730+
}
731+
721732
onappend(tree, bitfield) {
722733
this.header.tree = tree
723734

@@ -860,7 +871,8 @@ function parseHeader(info) {
860871
tree: info.head || getDefaultTree(),
861872
hints: {
862873
reorgs: [],
863-
contiguousLength: info.hints ? info.hints.contiguousLength : 0
874+
contiguousLength: info.hints ? info.hints.contiguousLength : 0,
875+
remoteContiguousLength: info.hints ? info.hints.remoteContiguousLength : 0
864876
}
865877
}
866878
}

lib/replicator.js

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,19 +514,19 @@ class Peer {
514514
else this._clearLocalRange(start, length)
515515

516516
const i = Math.floor(start / DEFAULT_SEGMENT_SIZE)
517-
const contig = this.core.header.hints.contiguousLength === this.core.state.length
517+
const fullyContig = this.core.header.hints.contiguousLength === this.core.state.length
518518

519519
if (
520520
start + LAST_BLOCKS < this.core.state.length &&
521521
!this.remoteSegmentsWanted.has(i) &&
522522
!drop &&
523-
!contig
523+
!fullyContig
524524
) {
525525
return
526526
}
527527

528528
let force = false
529-
if (contig && !drop) {
529+
if (fullyContig && !drop) {
530530
start = 0
531531
length = this.core.state.length
532532

@@ -1152,15 +1152,23 @@ class Peer {
11521152
this._clearLocalRange(fixedStart, length)
11531153
}
11541154

1155-
onrange({ drop, start, length }) {
1155+
async onrange({ drop, start, length }) {
11561156
const has = drop === false
11571157

11581158
if (drop === true && start < this._remoteContiguousLength) {
11591159
this._remoteContiguousLength = start
11601160
}
11611161

11621162
if (start === 0 && drop === false) {
1163-
if (length > this._remoteContiguousLength) this._remoteContiguousLength = length
1163+
if (length > this._remoteContiguousLength) {
1164+
this._remoteContiguousLength = length
1165+
if (
1166+
this.remoteFork === this.core.state.fork &&
1167+
length > this.core.header.hints.remoteContiguousLength
1168+
) {
1169+
await this.core.updateRemoteContiguousLength(length)
1170+
}
1171+
}
11641172
} else if (length === 1) {
11651173
const bitfield = this.core.skipBitfield === null ? this.core.bitfield : this.core.skipBitfield
11661174
this.remoteBitfield.set(start, has)

lib/session-state.js

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,13 +473,37 @@ module.exports = class SessionState {
473473
if (this.isDefault()) {
474474
await storeBitfieldRange(this.storage, storage, batch.ancestors, batch.treeLength, false)
475475
if (batch.ancestors < this.core.header.hints.contiguousLength) {
476-
storage.setHints({ contiguousLength: batch.ancestors })
476+
this.core.header.hints.remoteContiguousLength = Math.min(
477+
batch.length,
478+
this.core.header.hints.remoteContiguousLength
479+
)
480+
storage.setHints({
481+
contiguousLength: batch.ancestors,
482+
remoteContiguousLength: this.core.header.hints.remoteContiguousLength
483+
})
477484
}
478485
}
479486

480487
return { dependency, tree, roots: batch.roots }
481488
}
482489

490+
async flushHints() {
491+
await this.mutex.lock()
492+
493+
try {
494+
const tx = this.createWriteBatch()
495+
496+
tx.setHints({
497+
contiguousLength: this.core.header.hints.contiguousLength,
498+
remoteContiguousLength: this.core.header.hints.remoteContiguousLength
499+
})
500+
501+
await tx.flush()
502+
} finally {
503+
this._unlock()
504+
}
505+
}
506+
483507
async clear(start, end, cleared) {
484508
await this.mutex.lock()
485509

@@ -505,7 +529,10 @@ module.exports = class SessionState {
505529
if (this.isDefault()) {
506530
await storeBitfieldRange(this.storage, tx, start, end, false)
507531
if (start < this.core.header.hints.contiguousLength) {
508-
tx.setHints({ contiguousLength: start })
532+
tx.setHints({
533+
contiguousLength: start,
534+
remoteContiguousLength: this.core.header.hints.remoteContiguousLength
535+
})
509536
}
510537
}
511538

@@ -577,7 +604,10 @@ module.exports = class SessionState {
577604
if (this.isDefault()) {
578605
await storeBitfieldRange(this.storage, tx, batch.ancestors, batch.length, true)
579606
if (this.length === this.core.header.hints.contiguousLength) {
580-
tx.setHints({ contiguousLength: this.length + values.length })
607+
tx.setHints({
608+
contiguousLength: this.length + values.length,
609+
remoteContiguousLength: this.core.header.hints.remoteContiguousLength
610+
})
581611
}
582612
}
583613

test/replicate.js

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,199 @@ test('hotswap works for a download with many slow peers', async function (t) {
23742374
t.pass(`Hotswap triggered (download took ${Date.now() - start}ms)`)
23752375
})
23762376

2377+
test('remote contiguous length', async function (t) {
2378+
const a = await create(t)
2379+
const b = await create(t, a.key)
2380+
2381+
t.is(a.remoteContiguousLength, 0)
2382+
2383+
await a.append(['a'])
2384+
2385+
t.is(a.remoteContiguousLength, 0)
2386+
2387+
a.on('remote-contiguous-length', (length) => {
2388+
t.is(length, 1, '`remote-contiguous-length` event fired')
2389+
})
2390+
2391+
replicate(a, b, t)
2392+
2393+
await b.get(0)
2394+
2395+
await eventFlush()
2396+
2397+
t.is(a.remoteContiguousLength, 1)
2398+
})
2399+
2400+
test('remote contiguous length - fully contiguous only', async function (t) {
2401+
t.plan(7)
2402+
const a = await create(t)
2403+
const b = await create(t, a.key)
2404+
2405+
t.is(a.remoteContiguousLength, 0)
2406+
2407+
await a.append(['a1'])
2408+
await a.append(['a2'])
2409+
2410+
t.is(a.remoteContiguousLength, 0)
2411+
2412+
a.on('remote-contiguous-length', (length) => {
2413+
t.is(length, 2, '`remote-contiguous-length` event fired')
2414+
})
2415+
2416+
replicate(a, b, t)
2417+
2418+
await b.get(0)
2419+
2420+
await eventFlush()
2421+
2422+
t.is(a.remoteContiguousLength, 0, 'remoteContiguousLength didnt update')
2423+
t.is(b.contiguousLength, 1, 'b has 1st block')
2424+
2425+
await b.get(1)
2426+
await eventFlush()
2427+
2428+
t.is(b.contiguousLength, 2, 'b all blocks')
2429+
t.is(a.remoteContiguousLength, 2, 'remoteContiguousLength updates')
2430+
})
2431+
2432+
test('remote contiguous length - updates on truncate', async function (t) {
2433+
const a = await create(t)
2434+
const b = await create(t, a.key)
2435+
2436+
t.is(a.remoteContiguousLength, 0)
2437+
2438+
await a.append(['a1'])
2439+
await a.append(['a2'])
2440+
2441+
t.is(a.remoteContiguousLength, 0)
2442+
2443+
replicate(a, b, t)
2444+
2445+
await b.get(0)
2446+
await b.get(1)
2447+
2448+
await eventFlush()
2449+
2450+
t.is(a.remoteContiguousLength, 2)
2451+
2452+
const truncateReceived = new Promise((resolve) => b.on('truncate', resolve))
2453+
await a.truncate(a.length - 1)
2454+
2455+
await truncateReceived
2456+
2457+
t.is(a.remoteContiguousLength, 1)
2458+
t.is(b.contiguousLength, 1)
2459+
})
2460+
2461+
test('remote contiguous length - event fires after truncating', async function (t) {
2462+
t.plan(8)
2463+
const a = await create(t)
2464+
const b = await create(t, a.key)
2465+
2466+
t.is(a.remoteContiguousLength, 0)
2467+
2468+
await a.append(['a1', 'a2', 'a3'])
2469+
2470+
t.is(a.remoteContiguousLength, 0)
2471+
2472+
replicate(a, b, t)
2473+
2474+
await b.download({ start: 0, end: a.length }).done()
2475+
await eventFlush() // To let `a` update `remoteContiguousLength`
2476+
2477+
t.is(a.remoteContiguousLength, 3)
2478+
2479+
const truncateReceived = new Promise((resolve) => b.on('truncate', resolve))
2480+
await a.truncate(1)
2481+
2482+
await truncateReceived
2483+
2484+
t.is(a.remoteContiguousLength, 1)
2485+
t.is(b.contiguousLength, 1)
2486+
2487+
b.on('remote-contiguous-length', () => t.pass('fires after truncating'))
2488+
await a.append(['a2v2', 'a3v2'])
2489+
2490+
await b.download({ start: 0, end: a.length }).done()
2491+
await eventFlush() // To let `a` update `remoteContiguousLength`
2492+
2493+
t.is(a.remoteContiguousLength, 3)
2494+
t.is(b.contiguousLength, 3)
2495+
})
2496+
2497+
test('remote contiguous length - correct after reorg', async function (t) {
2498+
t.plan(9)
2499+
const a = await create(t)
2500+
const b = await create(t, a.key)
2501+
2502+
t.is(a.remoteContiguousLength, 0)
2503+
2504+
await a.append(['a1', 'a2', 'a3'])
2505+
2506+
t.is(a.remoteContiguousLength, 0)
2507+
2508+
const streams = replicate(a, b, t)
2509+
2510+
await b.download({ start: 0, end: a.length }).done()
2511+
await eventFlush() // To let `a` update `remoteContiguousLength`
2512+
2513+
t.is(a.remoteContiguousLength, 3)
2514+
2515+
const truncateReceived = new Promise((resolve) => b.on('truncate', resolve))
2516+
2517+
// Stop replicating to clear any messages immediately after truncation & append
2518+
unreplicate(streams)
2519+
2520+
await a.truncate(1)
2521+
await a.append('a2v2')
2522+
2523+
// Rereplicate to update b
2524+
replicate(a, b, t)
2525+
await truncateReceived
2526+
2527+
t.is(a.remoteContiguousLength, 1, 'remoteContiguousLength shows truncated length')
2528+
t.is(b.contiguousLength, 1)
2529+
t.is(b.remoteContiguousLength, 2)
2530+
2531+
b.on('remote-contiguous-length', () => t.pass('fires after truncating'))
2532+
await a.append(['a3v2'])
2533+
2534+
await b.download({ start: 0, end: a.length }).done()
2535+
await eventFlush() // To let `a` update `remoteContiguousLength`
2536+
2537+
t.is(a.remoteContiguousLength, 3)
2538+
t.is(b.contiguousLength, 3)
2539+
})
2540+
2541+
test('remote contiguous length - persists', async function (t) {
2542+
const createA = await createStored(t)
2543+
const a = await createA()
2544+
await a.ready()
2545+
const b = await create(t, a.key)
2546+
2547+
t.is(a.remoteContiguousLength, 0)
2548+
2549+
await a.append(['a1', 'a2', 'a3'])
2550+
2551+
t.is(a.remoteContiguousLength, 0)
2552+
2553+
replicate(a, b, t)
2554+
2555+
await b.download({ start: 0, end: a.length }).done()
2556+
await eventFlush() // To let `a` update `remoteContiguousLength`
2557+
2558+
t.is(a.remoteContiguousLength, 3)
2559+
2560+
await a.close()
2561+
2562+
const a2 = await createA(a.key)
2563+
t.is(a2.remoteContiguousLength, 0, 'remoteContiguousLength initial 0 before ready')
2564+
await a2.ready()
2565+
t.teardown(() => a2.close())
2566+
2567+
t.is(a2.remoteContiguousLength, 3)
2568+
})
2569+
23772570
async function createAndDownload(t, core) {
23782571
const b = await create(t, core.key)
23792572
replicate(core, b, t, { teardown: false })

0 commit comments

Comments
 (0)