Skip to content

Commit beac72a

Browse files
authored
eagerly flush hints (#745)
* eagerly flush hints * fix test and only for default * debounce * lint
1 parent 31d845e commit beac72a

File tree

4 files changed

+42
-21
lines changed

4 files changed

+42
-21
lines changed

lib/core.js

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ module.exports = class Core {
5555
this.destroyed = false
5656
this.closed = false
5757

58+
this.hintsChanged = false
59+
5860
this._bitfield = null
5961
this._verifies = null
6062
this._verifiesFlushed = null
@@ -531,6 +533,8 @@ module.exports = class Core {
531533
this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop)
532534
}
533535
}
536+
537+
if (this.hintsChanged) await this.flushHints()
534538
} finally {
535539
this.state._clearActiveBatch()
536540
this.state.mutex.unlock()
@@ -539,6 +543,20 @@ module.exports = class Core {
539543
return verifies[0] !== null
540544
}
541545

546+
async flushHints() {
547+
if (!this.hintsChanged) return
548+
this.hintsChanged = false // we unset this immediately as a "debounce"
549+
550+
const tx = this.state.storage.write()
551+
552+
tx.setHints({
553+
contiguousLength: this.header.hints.contiguousLength,
554+
remoteContiguousLength: this.header.hints.remoteContiguousLength
555+
})
556+
557+
await tx.flush()
558+
}
559+
542560
async checkConflict(proof, from) {
543561
if (this.state.length < proof.upgrade.length || proof.fork !== this.state.fork) {
544562
// out of date this proof - ignore for now
@@ -716,13 +734,13 @@ module.exports = class Core {
716734

717735
if (contig.length !== -1 && contig.length !== this.header.hints.contiguousLength) {
718736
this.header.hints.contiguousLength = contig.length
737+
this.hintsChanged = true
719738
}
720739
}
721740

722-
async updateRemoteContiguousLength(length) {
741+
updateRemoteContiguousLength(length) {
723742
this.header.hints.remoteContiguousLength = length
724-
await this.state.flushHints()
725-
if (this.header.hints.remoteContiguousLength !== length) return
743+
this.hintsChanged = true
726744

727745
for (let i = this.monitors.length - 1; i >= 0; i--) {
728746
this.monitors[i].emit('remote-contiguous-length', length)

lib/replicator.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1166,7 +1166,7 @@ class Peer {
11661166
this.remoteFork === this.core.state.fork &&
11671167
length > this.core.header.hints.remoteContiguousLength
11681168
) {
1169-
await this.core.updateRemoteContiguousLength(length)
1169+
this.core.updateRemoteContiguousLength(length)
11701170
}
11711171
}
11721172
} else if (length === 1) {
@@ -1184,6 +1184,7 @@ class Peer {
11841184
}
11851185
}
11861186

1187+
if (this.core.hintsChanged) await this.core.flushHints()
11871188
if (drop === false) this._update()
11881189
}
11891190

lib/session-state.js

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ module.exports = class SessionState {
300300
const append = b ? { start: b.start, length: b.appends, drop: false } : null
301301

302302
this.onappend(tree, append, true)
303+
304+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
303305
} finally {
304306
this.mutex.unlock()
305307
this.core.checkIfIdle()
@@ -360,6 +362,8 @@ module.exports = class SessionState {
360362

361363
this.onappend(head, bitfield, flushed)
362364
}
365+
366+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
363367
} finally {
364368
this._clearActiveBatch()
365369
this.updating = false
@@ -419,6 +423,8 @@ module.exports = class SessionState {
419423
if (dependency) this.storage.setDependencyHead(dependency)
420424

421425
this.ontruncate(tree, tree.length, batch.treeLength, flushed)
426+
427+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
422428
} finally {
423429
this._unlock()
424430
}
@@ -445,6 +451,8 @@ module.exports = class SessionState {
445451
if (dependency) this.storage.setDependencyHead(dependency)
446452

447453
this.ontruncate(tree, batch.ancestors, batch.treeLength, flushed)
454+
455+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
448456
} finally {
449457
this._unlock()
450458
}
@@ -487,23 +495,6 @@ module.exports = class SessionState {
487495
return { dependency, tree, roots: batch.roots }
488496
}
489497

490-
async flushHints() {
491-
await this.mutex.lock()
492-
493-
try {
494-
const tx = this.createWriteBatch()
495-
496-
tx.setHints({
497-
contiguousLength: this.core.header.hints.contiguousLength,
498-
remoteContiguousLength: this.core.header.hints.remoteContiguousLength
499-
})
500-
501-
await tx.flush()
502-
} finally {
503-
this._unlock()
504-
}
505-
}
506-
507498
async clear(start, end, cleared) {
508499
await this.mutex.lock()
509500

@@ -548,9 +539,12 @@ module.exports = class SessionState {
548539
// todo: atomic event handle
549540
if (this.isDefault() && flushed) {
550541
const length = end - start
542+
551543
this.core.updateContiguousLength({ start, length, drop: true })
552544
this.core._setBitfieldRanges(start, end, false)
553545
this.core.replicator.onhave(start, length, true)
546+
547+
if (this.core.hintsChanged) await this.core.flushHints()
554548
}
555549
} finally {
556550
this._unlock()
@@ -631,6 +625,8 @@ module.exports = class SessionState {
631625

632626
this.onappend(tree, bitfield, flushed)
633627

628+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
629+
634630
return { length: this.length, byteLength: this.byteLength }
635631
} finally {
636632
this._unlock()
@@ -780,6 +776,8 @@ module.exports = class SessionState {
780776

781777
if (truncating) this.ontruncate(tree, sharedLength, origLength, flushed)
782778
if (sharedLength < length) this.onappend(tree, null, flushed)
779+
780+
if (this.core.hintsChanged && this.isDefault()) await this.core.flushHints()
783781
} finally {
784782
this.mutex.unlock()
785783
}

test/replicate.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,10 @@ test('session id reuse does not stall', async function (t) {
17731773
if (all.length === 0) break
17741774
}
17751775

1776+
// wait a little bit, cause technically the above has a storage race
1777+
// since it checks the bitfield manually and the event isnt timed to that
1778+
await new Promise((resolve) => setTimeout(resolve, 1000))
1779+
17761780
t.pass('All blocks downloaded')
17771781
t.is(downloaded, 100, 'Downloaded all blocks exactly once')
17781782

0 commit comments

Comments
 (0)