Skip to content

Commit dbe35d9

Browse files
committed
Add metric and test
1 parent eb9b8cb commit dbe35d9

File tree

4 files changed

+81
-4
lines changed

4 files changed

+81
-4
lines changed

lib/replicator.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ class Peer {
371371
wireWant: { tx: 0, rx: 0 },
372372
wireBitfield: { tx: 0, rx: 0 },
373373
wireRange: { tx: 0, rx: 0 },
374-
wireExtension: { tx: 0, rx: 0 }
374+
wireExtension: { tx: 0, rx: 0 },
375+
hotswaps: 0
375376
}
376377

377378
this.receiverQueue = new ReceiverQueue()
@@ -1443,7 +1444,8 @@ module.exports = class Replicator {
14431444
wireWant: { tx: 0, rx: 0 },
14441445
wireBitfield: { tx: 0, rx: 0 },
14451446
wireRange: { tx: 0, rx: 0 },
1446-
wireExtension: { tx: 0, rx: 0 }
1447+
wireExtension: { tx: 0, rx: 0 },
1448+
hotswaps: 0
14471449
}
14481450

14491451
this._attached = new Set()
@@ -2190,6 +2192,8 @@ module.exports = class Replicator {
21902192

21912193
for (const b of this.hotswaps.pick(peer)) {
21922194
if (peer._requestBlock(b) === false) continue
2195+
peer.stats.hotswaps++
2196+
peer.replicator.stats.hotswaps++
21932197
if (peer.inflight >= maxHotswaps) break
21942198
}
21952199
}

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
},
6767
"devDependencies": {
6868
"brittle": "^3.0.0",
69+
"debugging-stream": "^3.1.0",
6970
"hyperswarm": "^4.3.6",
7071
"rache": "^1.0.0",
7172
"random-access-memory": "^6.1.0",

test/helpers/index.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const Hypercore = require('../../')
22
const RAM = require('random-access-memory')
3+
const DebuggingStream = require('debugging-stream')
34

45
exports.create = async function create (...args) {
56
const core = new Hypercore(RAM, ...args)
@@ -56,6 +57,38 @@ exports.unreplicate = function unreplicate (streams) {
5657
}))
5758
}
5859

60+
exports.replicateDebugStream = function replicate (a, b, t, opts = {}) {
61+
const { latency, speed, jitter } = opts
62+
63+
const s1 = a.replicate(true, { keepAlive: false, ...opts })
64+
const s2Base = b.replicate(false, { keepAlive: false, ...opts })
65+
const s2 = new DebuggingStream(s2Base, { latency, speed, jitter })
66+
67+
s1.on('error', err => t.comment(`replication stream error (initiator): ${err}`))
68+
s2.on('error', err => t.comment(`replication stream error (responder): ${err}`))
69+
70+
if (opts.teardown !== false) {
71+
t.teardown(async function () {
72+
let missing = 2
73+
await new Promise(resolve => {
74+
s1.on('close', onclose)
75+
s1.destroy()
76+
77+
s2.on('close', onclose)
78+
s2.destroy()
79+
80+
function onclose () {
81+
if (--missing === 0) resolve()
82+
}
83+
})
84+
})
85+
}
86+
87+
s1.pipe(s2).pipe(s1)
88+
89+
return [s1, s2]
90+
}
91+
5992
exports.eventFlush = async function eventFlush () {
6093
await new Promise(resolve => setImmediate(resolve))
6194
}

test/replicate.js

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ const test = require('brittle')
22
const b4a = require('b4a')
33
const RAM = require('random-access-memory')
44
const NoiseSecretStream = require('@hyperswarm/secret-stream')
5-
const { create, replicate, unreplicate, eventFlush } = require('./helpers')
5+
const { create, replicate, unreplicate, eventFlush, replicateDebugStream } = require('./helpers')
66
const { makeStreamPair } = require('./helpers/networking.js')
77
const Hypercore = require('../')
88

@@ -51,9 +51,10 @@ test('basic replication stats', async function (t) {
5151
t.is(aStats.wireExtension.tx, 0, 'wireExtension init 0')
5252
t.is(aStats.wireCancel.rx, 0, 'wireCancel init 0')
5353
t.is(aStats.wireCancel.tx, 0, 'wireCancel init 0')
54+
t.is(aStats.hotswaps, 0, 'hotswaps init 0')
5455

5556
const initStatsLength = [...Object.keys(aStats)].length
56-
t.is(initStatsLength, 8, 'Expected amount of stats')
57+
t.is(initStatsLength, 9, 'Expected amount of stats')
5758

5859
replicate(a, b, t)
5960

@@ -1778,6 +1779,44 @@ test('replication count should never go negative', async function (t) {
17781779
}
17791780
})
17801781

1782+
test('uses hotswaps to avoid long download tail', async t => {
1783+
const core = await create()
1784+
const slowCore = await create(core.key)
1785+
1786+
const batch = []
1787+
while (batch.length < 100) {
1788+
batch.push(Buffer.allocUnsafe(60000))
1789+
}
1790+
await core.append(batch)
1791+
1792+
replicate(core, slowCore, t)
1793+
await slowCore.download({ start: 0, end: core.length }).done()
1794+
1795+
t.is(slowCore.contiguousLength, 100, 'sanity check')
1796+
1797+
const peerCore = await create(core.key)
1798+
await peerCore.ready()
1799+
const [fastStream] = replicateDebugStream(core, peerCore, t, { speed: 10_000_000 })
1800+
const [slowStream] = replicateDebugStream(slowCore, peerCore, t, { speed: 1_000_000 })
1801+
const fastKey = fastStream.publicKey
1802+
const slowKey = slowStream.publicKey
1803+
const peerKey = fastStream.remotePublicKey
1804+
t.alike(peerKey, slowStream.remotePublicKey, 'sanity check')
1805+
1806+
await peerCore.download({ start: 0, end: core.length }).done()
1807+
1808+
const fastPeer = peerCore.replicator.peers.filter(
1809+
p => b4a.equals(p.stream.remotePublicKey, fastKey))[0]
1810+
const slowPeer = peerCore.replicator.peers.filter(
1811+
p => b4a.equals(p.stream.remotePublicKey, slowKey))[0]
1812+
1813+
t.ok(fastPeer.stats.hotswaps > 0, 'hotswaps happened for fast peer')
1814+
t.ok(fastPeer.stats.hotswaps > 0, 'No hotswaps happened for slow peer')
1815+
t.ok(slowPeer.stats.wireCancel.tx > 0, 'slow peer cancelled requests')
1816+
t.ok(fastPeer.stats.wireData.rx > slowPeer.stats.wireData.rx, 'sanity check: received more data from fast peer')
1817+
t.ok(slowPeer.stats.wireData.rx > 0, 'sanity check: still received data from slow peer')
1818+
})
1819+
17811820
async function waitForRequestBlock (core) {
17821821
while (true) {
17831822
const reqBlock = core.replicator._inflight._requests.find(req => req && req.block)

0 commit comments

Comments
 (0)