Skip to content

Commit eac0bea

Browse files
committed
Bugfix: send the broadcast to all peers (+test)
1 parent c896d95 commit eac0bea

File tree

2 files changed

+55
-4
lines changed

2 files changed

+55
-4
lines changed

lib/replicator.js

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ class Peer {
382382
this.remoteSupportsSeeks = false
383383
this.inflightRange = inflightRange
384384
this.remoteSegmentsWanted = new Set()
385+
this.fullyDownloaded = false
385386

386387
this.paused = false
387388
this.removed = false
@@ -482,6 +483,7 @@ class Peer {
482483
}
483484

484485
signalUpgrade () {
486+
if (!this.replicator.hasFullCore && this.fullyDownloaded) this.fullyDownloaded = false
485487
if (this._shouldUpdateCanUpgrade() === true) this._updateCanUpgradeAndSync()
486488
else this.sendSync()
487489
}
@@ -508,7 +510,7 @@ class Peer {
508510

509511
// Always send the broadcast when we switch from sparse to fully contiguous
510512
if (!this.fullyDownloaded) {
511-
this.replicator.fullyDownloaded = true
513+
this.fullyDownloaded = true
512514
force = true
513515
}
514516
}
@@ -1509,7 +1511,6 @@ module.exports = class Replicator {
15091511
this.destroyed = false
15101512
this.downloading = false
15111513
this.activeSessions = 0
1512-
this.fullyDownloaded = false
15131514

15141515
this.hotswaps = new HotswapQueue()
15151516
this.inflightRange = inflightRange || DEFAULT_MAX_INFLIGHT
@@ -1643,8 +1644,6 @@ module.exports = class Replicator {
16431644
if (!this._blocks.isEmpty() || this._ranges.length !== 0 || this._seeks.length !== 0) {
16441645
this._updateNonPrimary(true)
16451646
}
1646-
1647-
if (!this.hasFullCore && this.fullyDownloaded) this.fullyDownloaded = false
16481647
}
16491648

16501649
// Called externally when a conflict has been detected and verified

test/replicate.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2164,6 +2164,58 @@ test('range is broadcast when a core is fully available', async function (t) {
21642164
t.is(writer.replicator.peers[0]._remoteContiguousLength, 7, 'writer detected reader is fully contig')
21652165
})
21662166

2167+
test('range is broadcast when a core is fully available (multiple peers)', async function (t) {
2168+
const writer = await create(t)
2169+
2170+
await writer.append(['a', 'b', 'c'])
2171+
const reader1 = await create(t, writer.key)
2172+
const reader2 = await create(t, writer.key)
2173+
2174+
replicate(writer, reader1, t)
2175+
await reader1.get(0)
2176+
const writerToReader1 = writer.replicator.peers[0]
2177+
const reader1ToWriter = reader1.replicator.peers[0]
2178+
2179+
replicate(writer, reader2, t)
2180+
await reader2.get(0)
2181+
const writerToReader2 = writer.replicator.peers[1]
2182+
const reader2ToWriter = reader1.replicator.peers[0]
2183+
2184+
replicate(reader1, reader2, t)
2185+
// Give time for replication to add the peers
2186+
await new Promise(resolve => setTimeout(resolve, 100))
2187+
2188+
const reader1ToReader2 = reader1.replicator.peers[1]
2189+
const reader2ToReader1 = reader2.replicator.peers[1]
2190+
t.is(reader1ToReader2 !== undefined, true, 'sanity check')
2191+
t.is(reader2ToReader1 !== undefined, true, 'sanity check')
2192+
2193+
await new Promise(resolve => setTimeout(resolve, 100))
2194+
2195+
t.is(writerToReader1._remoteContiguousLength, 0, 'reader1 skipped range update to writer (not contig yet)')
2196+
t.is(writerToReader2._remoteContiguousLength, 0, 'reader2 skipped range update to writer (not contig yet)')
2197+
t.is(reader1ToWriter._remoteContiguousLength, 3, 'writer updated for reader1')
2198+
t.is(reader2ToWriter._remoteContiguousLength, 3, 'writer updated for reader2')
2199+
2200+
await reader1.get(1)
2201+
await reader1.get(2)
2202+
await reader2.get(1)
2203+
await reader2.get(2)
2204+
await new Promise(resolve => setTimeout(resolve, 100))
2205+
2206+
t.is(reader1.contiguousLength, 3, 'sanity check')
2207+
t.is(reader2.contiguousLength, 3, 'sanity check')
2208+
t.is(writerToReader1._remoteContiguousLength, 3, 'reader1 sent range update to writer (became contig)')
2209+
t.is(writerToReader2._remoteContiguousLength, 3, 'reader2 sent range update to writer (became contig)')
2210+
t.is(reader1ToReader2._remoteContiguousLength, 3, 'reader2 broadcast to reader1 too')
2211+
t.is(reader2ToReader1._remoteContiguousLength, 3, 'reader1 broadcast to reader2 too')
2212+
2213+
await writer.append(['d', 'e'])
2214+
await new Promise(resolve => setTimeout(resolve, 100))
2215+
t.is(reader1ToWriter._remoteContiguousLength, 5, 'writer updated for reader1')
2216+
t.is(reader2ToWriter._remoteContiguousLength, 5, 'writer updated for reader2')
2217+
})
2218+
21672219
async function waitForRequestBlock (core) {
21682220
while (true) {
21692221
const reqBlock = core.core.replicator._inflight._requests.find(req => req && req.block)

0 commit comments

Comments
 (0)