Skip to content

Commit a7e5e50

Browse files
committed
more stuff in core
1 parent 047da5f commit a7e5e50

File tree

6 files changed

+111
-231
lines changed

6 files changed

+111
-231
lines changed

index.js

Lines changed: 1 addition & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -547,112 +547,6 @@ class Hypercore extends EventEmitter {
547547
}
548548
}
549549

550-
async _oncoreconflict (proof, from) {
551-
await this.core.replicator.onconflict(from)
552-
553-
for (const s of this.sessions) s.emit('conflict', proof.upgrade.length, proof.fork, proof)
554-
555-
const err = new Error('Two conflicting signatures exist for length ' + proof.upgrade.length)
556-
await this._closeAllSessions(err)
557-
}
558-
559-
async _closeAllSessions (err) {
560-
// this.sessions modifies itself when a session closes
561-
// This way we ensure we indeed iterate over all sessions
562-
const sessions = [...this.sessions]
563-
564-
const all = []
565-
for (const s of sessions) all.push(s.close({ error: err, force: false })) // force false or else infinite recursion
566-
await Promise.allSettled(all)
567-
}
568-
569-
_oncoreupdate ({ status, bitfield, value, from }) {
570-
if (status !== 0) {
571-
const truncatedNonSparse = (status & 0b1000) !== 0
572-
const appendedNonSparse = (status & 0b0100) !== 0
573-
const truncated = (status & 0b0010) !== 0
574-
const appended = (status & 0b0001) !== 0
575-
576-
if (truncated) {
577-
this.core.replicator.ontruncate(bitfield.start, bitfield.length)
578-
}
579-
580-
if ((status & 0b10011) !== 0) {
581-
this.core.replicator.onupgrade()
582-
}
583-
584-
if (status & 0b10000) {
585-
for (let i = 0; i < this.sessions.length; i++) {
586-
const s = this.sessions[i]
587-
588-
if (s.encryption && s.encryption.compat !== this.core.compat) {
589-
s.encryption = new BlockEncryption(s.encryption.key, this.key, { compat: this.core.compat, isBlockKey: s.encryption.isBlockKey })
590-
}
591-
}
592-
593-
for (let i = 0; i < this.sessions.length; i++) {
594-
this.sessions[i].emit('manifest')
595-
}
596-
}
597-
598-
for (let i = 0; i < this.sessions.length; i++) {
599-
const s = this.sessions[i]
600-
601-
if (truncated) {
602-
// If snapshotted, make sure to update our compat so we can fail gets
603-
if (s._snapshot && bitfield.start < s._snapshot.compatLength) s._snapshot.compatLength = bitfield.start
604-
}
605-
606-
if (s.sparse ? truncated : truncatedNonSparse) {
607-
s.emit('truncate', bitfield.start, this.core.tree.fork)
608-
}
609-
610-
// For sparse sessions, immediately emit appends. If non-sparse, emit if contig length has updated
611-
if (s.sparse ? appended : appendedNonSparse) {
612-
s.emit('append')
613-
}
614-
}
615-
616-
const contig = this.core.header.hints.contiguousLength
617-
618-
// When the contig length catches up, broadcast the non-sparse length to peers
619-
if (appendedNonSparse && contig === this.core.tree.length) {
620-
for (const peer of this.peers) {
621-
if (peer.broadcastedNonSparse) continue
622-
623-
peer.broadcastRange(0, contig)
624-
peer.broadcastedNonSparse = true
625-
}
626-
}
627-
}
628-
629-
if (bitfield) {
630-
this.core.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop)
631-
}
632-
633-
if (value) {
634-
const byteLength = value.byteLength - this.padding
635-
636-
for (let i = 0; i < this.sessions.length; i++) {
637-
this.sessions[i].emit('download', bitfield.start, byteLength, from)
638-
}
639-
}
640-
}
641-
642-
_onpeerupdate (added, peer) {
643-
const name = added ? 'peer-add' : 'peer-remove'
644-
645-
for (let i = 0; i < this.sessions.length; i++) {
646-
this.sessions[i].emit(name, peer)
647-
648-
if (added) {
649-
for (const ext of this.sessions[i].extensions.values()) {
650-
peer.extensions.set(ext.name, ext)
651-
}
652-
}
653-
}
654-
}
655-
656550
async setUserData (key, value, { flush = false } = {}) {
657551
if (this.opened === false) await this.opening
658552
await this.state.setUserData(key, value)
@@ -1088,17 +982,14 @@ function initOnce (session, storage, key, opts) {
1088982
keyPair: opts.keyPair,
1089983
legacy: opts.legacy,
1090984
manifest: opts.manifest,
1091-
globalCache: opts.globalCache || null, // session is a temp option, not to be relied on unless you know what you are doing (no semver guarantees)
1092-
onupdate: session._oncoreupdate.bind(session),
1093-
onconflict: session._oncoreconflict.bind(session)
985+
globalCache: opts.globalCache || null // session is a temp option, not to be relied on unless you know what you are doing (no semver guarantees)
1094986
})
1095987

1096988
session.core.replicator = new Replicator(session.core, {
1097989
eagerUpgrade: true,
1098990
notDownloadingLinger: opts.notDownloadingLinger,
1099991
allowFork: opts.allowFork !== false,
1100992
inflightRange: opts.inflightRange,
1101-
onpeerupdate: session._onpeerupdate.bind(session),
1102993
onupload: session._onupload.bind(session),
1103994
oninvalid: session._oninvalid.bind(session)
1104995
})

lib/copy-prologue.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ async function flushBatch (prologue, src, dst, batch) {
179179
function signalReplicator (core, upgraded, start, length) {
180180
const status = upgraded ? 0b0011 : 0b0010
181181
const bitfield = { drop: false, start, length }
182-
core.onupdate({ status, bitfield, value: null, from: null })
182+
core._onupdate({ status, bitfield, value: null, from: null })
183183
}
184184

185185
function prologueToTree (prologue) {

lib/core.js

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const { BAD_ARGUMENT, STORAGE_EMPTY, STORAGE_CONFLICT, INVALID_OPERATION, INVALI
1515
const Verifier = require('./verifier')
1616
const audit = require('./audit')
1717
const copyPrologue = require('./copy-prologue')
18+
const BlockEncryption = require('./block-encryption')
1819

1920
const HEAD = Symbol.for('head')
2021
const CORE = Symbol.for('core')
@@ -406,9 +407,6 @@ module.exports = class Core {
406407
this.discoveryKey = opts.discoveryKey || (opts.key && crypto.discoveryKey(opts.key)) || null
407408
this.manifest = null
408409

409-
this.onupdate = opts.onupdate || noop
410-
this.onconflict = opts.onconflict || noop
411-
412410
this.preupdate = null
413411
this.header = null
414412
this.compat = false
@@ -743,7 +741,7 @@ module.exports = class Core {
743741
}
744742

745743
case CORE: { // core
746-
this.onupdate(update)
744+
this._onupdate(update)
747745
break
748746
}
749747

@@ -1000,7 +998,7 @@ module.exports = class Core {
1000998

1001999
if (b4a.equals(localTreeHash, remoteTreeHash)) return false
10021000

1003-
await this.onconflict(proof)
1001+
await this._onconflict(proof)
10041002
return true
10051003
}
10061004

@@ -1088,6 +1086,86 @@ module.exports = class Core {
10881086
return this._closing
10891087
}
10901088

1089+
// session management - should be moved to some session manager next
1090+
_onupdate ({ status, bitfield, value, from }) {
1091+
if (this.sessions.length === 0 || this.replicator === null) return
1092+
1093+
if (status !== 0) {
1094+
const truncated = (status & 0b0010) !== 0
1095+
const appended = (status & 0b0001) !== 0
1096+
1097+
if (truncated) {
1098+
this.replicator.ontruncate(bitfield.start, bitfield.length)
1099+
}
1100+
1101+
if ((status & 0b10011) !== 0) {
1102+
this.replicator.onupgrade()
1103+
}
1104+
1105+
if (status & 0b10000) {
1106+
for (let i = 0; i < this.sessions.length; i++) {
1107+
const s = this.sessions[i]
1108+
1109+
if (s.encryption && s.encryption.compat !== this.compat) {
1110+
s.encryption = this.encryption = new BlockEncryption(s.encryption.key, this.key, { compat: this.compat, isBlockKey: s.encryption.isBlockKey })
1111+
}
1112+
}
1113+
1114+
for (let i = 0; i < this.sessions.length; i++) {
1115+
this.sessions[i].emit('manifest')
1116+
}
1117+
}
1118+
1119+
for (let i = 0; i < this.sessions.length; i++) {
1120+
const s = this.sessions[i]
1121+
1122+
if (truncated) {
1123+
// If snapshotted, make sure to update our compat so we can fail gets
1124+
if (s._snapshot && bitfield.start < s._snapshot.compatLength) s._snapshot.compatLength = bitfield.start
1125+
}
1126+
1127+
if (truncated) {
1128+
s.emit('truncate', bitfield.start, this.tree.fork)
1129+
}
1130+
1131+
if (appended) {
1132+
s.emit('append')
1133+
}
1134+
}
1135+
}
1136+
1137+
if (bitfield) {
1138+
this.replicator.onhave(bitfield.start, bitfield.length, bitfield.drop)
1139+
}
1140+
1141+
if (value) {
1142+
const byteLength = value.byteLength - this.padding
1143+
1144+
for (let i = 0; i < this.sessions.length; i++) {
1145+
this.sessions[i].emit('download', bitfield.start, byteLength, from)
1146+
}
1147+
}
1148+
}
1149+
1150+
async _onconflict (proof, from) {
1151+
await this.replicator.onconflict(from)
1152+
1153+
for (const s of this.sessions) s.emit('conflict', proof.upgrade.length, proof.fork, proof)
1154+
1155+
const err = new Error('Two conflicting signatures exist for length ' + proof.upgrade.length)
1156+
await this._closeAllSessions(err)
1157+
}
1158+
1159+
async _closeAllSessions (err) {
1160+
// this.sessions modifies itself when a session closes
1161+
// This way we ensure we indeed iterate over all sessions
1162+
const sessions = [...this.sessions]
1163+
1164+
const all = []
1165+
for (const s of sessions) all.push(s.close({ error: err, force: false })) // force false or else infinite recursion
1166+
await Promise.allSettled(all)
1167+
}
1168+
10911169
async _close () {
10921170
if (this.replicator) await this.replicator.destroy()
10931171
this.closed = true

lib/replicator.js

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,14 +1432,12 @@ module.exports = class Replicator {
14321432
eagerUpgrade = true,
14331433
allowFork = true,
14341434
inflightRange = null,
1435-
onpeerupdate = noop,
14361435
onupload = noop,
14371436
oninvalid = noop
14381437
} = {}) {
14391438
this.core = core
14401439
this.eagerUpgrade = eagerUpgrade
14411440
this.allowFork = allowFork
1442-
this.onpeerupdate = onpeerupdate
14431441
this.onupload = onupload
14441442
this.oninvalid = oninvalid
14451443
this.ondownloading = null // optional external hook for monitoring downloading status
@@ -1786,7 +1784,7 @@ module.exports = class Replicator {
17861784
this._hadPeers = true
17871785
this.peers.push(peer)
17881786
this.updatePeer(peer)
1789-
this.onpeerupdate(true, peer)
1787+
this._onpeerupdate(true, peer)
17901788
}
17911789

17921790
_requestDone (id, roundtrip) {
@@ -1808,7 +1806,7 @@ module.exports = class Replicator {
18081806

18091807
if (peer.useSession) this._closeSessionMaybe()
18101808

1811-
this.onpeerupdate(false, peer)
1809+
this._onpeerupdate(false, peer)
18121810
this.updateAll()
18131811
}
18141812

@@ -2449,6 +2447,21 @@ module.exports = class Replicator {
24492447
return false
24502448
}
24512449
}
2450+
2451+
_onpeerupdate (added, peer) {
2452+
const name = added ? 'peer-add' : 'peer-remove'
2453+
const sessions = this.core.sessions
2454+
2455+
for (let i = 0; i < sessions.length; i++) {
2456+
sessions[i].emit(name, peer)
2457+
2458+
if (added) {
2459+
for (const ext of sessions[i].extensions.values()) {
2460+
peer.extensions.set(ext.name, ext)
2461+
}
2462+
}
2463+
}
2464+
}
24522465
}
24532466

24542467
function matchingRequest (req, data) {

0 commit comments

Comments
 (0)