Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: write peer stream messages atomically #484

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/** @type {import('aegir').PartialOptions} */
export default {
build: {
bundlesizeMax: '85KB'
}
}
3 changes: 0 additions & 3 deletions .mocharc.yaml

This file was deleted.

9,458 changes: 4,292 additions & 5,166 deletions package-lock.json

Large diffs are not rendered by default.

50 changes: 25 additions & 25 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,42 +72,41 @@
},
"homepage": "https://github.com/ChainSafe/js-libp2p-gossipsub#readme",
"dependencies": {
"@libp2p/crypto": "^3.0.1",
"@libp2p/interface": "^1.0.1",
"@libp2p/interface-internal": "^1.0.1",
"@libp2p/peer-id": "^4.0.1",
"@libp2p/pubsub": "^9.0.0",
"@multiformats/multiaddr": "^12.1.3",
"@libp2p/crypto": "^4.0.1",
"@libp2p/interface": "^1.1.2",
"@libp2p/interface-internal": "^1.0.7",
"@libp2p/peer-id": "^4.0.5",
"@libp2p/pubsub": "^9.0.8",
"@multiformats/multiaddr": "^12.1.14",
"abortable-iterator": "^5.0.1",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.1",
"it-length-prefixed": "^9.0.4",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
"multiformats": "^12.0.1",
"protobufjs": "^7.2.4",
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
"protobufjs": "^7.2.6",
"uint8arraylist": "^2.4.8",
"uint8arrays": "^5.0.1"
},
"devDependencies": {
"@chainsafe/as-sha256": "^0.2.4",
"@chainsafe/as-sha256": "^0.4.1",
"@dapplion/benchmark": "^0.2.4",
"@libp2p/floodsub": "^9.0.0",
"@libp2p/interface-compliance-tests": "^5.0.2",
"@libp2p/logger": "^4.0.1",
"@libp2p/peer-id-factory": "^4.0.0",
"@libp2p/peer-store": "^10.0.0",
"@types/node": "^17.0.21",
"aegir": "^41.0.0",
"datastore-core": "^9.1.1",
"@libp2p/floodsub": "^9.0.9",
"@libp2p/interface-compliance-tests": "^5.2.0",
"@libp2p/logger": "^4.0.5",
"@libp2p/peer-id-factory": "^4.0.5",
"@libp2p/peer-store": "^10.0.8",
"@types/node": "^20.11.6",
"aegir": "^42.2.2",
"datastore-core": "^9.2.7",
"delay": "^6.0.0",
"mkdirp": "^3.0.1",
"p-defer": "^4.0.0",
"p-event": "^6.0.0",
"p-retry": "^5.1.2",
"p-retry": "^6.2.0",
"p-wait-for": "^5.0.2",
"sinon": "^15.1.2",
"sinon": "^17.0.1",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
"ts-sinon": "^2.0.2"
},
"engines": {
Expand Down Expand Up @@ -137,5 +136,6 @@
"Hugo Dias <[email protected]>",
"Franck Royer <[email protected]>",
"ChainSafe <[email protected]>"
]
],
"sideEffects": false
}
3 changes: 1 addition & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ import type {
Logger,
ComponentLogger
} from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, IncomingStreamData, Registrar } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Uint8ArrayList } from 'uint8arraylist'

Expand Down
18 changes: 5 additions & 13 deletions src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@ interface InboundStreamOpts {
}

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly lpPushable: Pushable<Uint8ArrayList>
private readonly pushable: Pushable<Uint8Array | Uint8ArrayList>
private readonly closeController: AbortController
private readonly maxBufferSize: number

constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.lpPushable = pushable({ objectMode: false })
this.pushable = pushable()
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

pipe(
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
(source) => encode(source),
this.rawStream
).catch(errCallback)

pipe(abortableSource(this.lpPushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream).catch(
errCallback
)
}

get protocol (): string {
Expand All @@ -49,24 +42,23 @@ export class OutboundStream {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}

this.pushable.push(data)
this.pushable.push(encode.single(data))
}

/**
* Same to push() but this is prefixed data so no need to encode length prefixed again
*/
pushPrefixed (data: Uint8ArrayList): void {
if (this.lpPushable.readableLength > this.maxBufferSize) {
if (this.pushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}
this.lpPushable.push(data)
this.pushable.push(data)
}

async close (): Promise<void> {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
await this.pushable.return()
await this.lpPushable.return()
await this.rawStream.close()
}
}
Expand Down
3 changes: 1 addition & 2 deletions test/accept-from.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import { stubInterface } from 'ts-sinon'
import { GossipSub } from '../src/index.js'
import { createPeerId } from './utils/index.js'
import { fastMsgIdFn } from './utils/msgId.js'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

const peerA = '16Uiu2HAmMkH6ZLen2tbhiuNCTZLLvrZaDgufNdT5MPjtC9Hr9YNA'

Expand Down
3 changes: 1 addition & 2 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { GossipsubDhi } from '../src/constants.js'
import { GossipSub } from '../src/index.js'
import { connectAllPubSubNodes, createComponentsArray, type GossipSubAndComponents } from './utils/create-pubsub.js'
import type { PeerStore } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { Registrar } from '@libp2p/interface-internal/registrar'
import type { ConnectionManager, Registrar } from '@libp2p/interface-internal'

describe('gossip', () => {
let nodes: GossipSubAndComponents[]
Expand Down
2 changes: 1 addition & 1 deletion test/utils/create-pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { MemoryDatastore } from 'datastore-core'
import { stubInterface } from 'ts-sinon'
import { GossipSub, type GossipSubComponents, type GossipsubOpts } from '../../src/index.js'
import type { TypedEventTarget, Libp2pEvents, PubSub } from '@libp2p/interface'
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
import type { ConnectionManager } from '@libp2p/interface-internal'

export interface CreateComponentsOpts {
init?: Partial<GossipsubOpts>
Expand Down
5 changes: 2 additions & 3 deletions test/utils/msgId.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import SHA256 from '@chainsafe/as-sha256'
import { digest } from '@chainsafe/as-sha256'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { messageIdToString } from '../../src/utils/messageIdToString.js'
import type { RPC } from '../../src/message/rpc.js'
Expand All @@ -16,5 +16,4 @@ export const getMsgIdStr = (msg: RPC.IMessage): string => messageIdToString(getM

export const fastMsgIdFn = (msg: RPC.IMessage): string =>
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error @chainsafe/as-sha256 types are wrong
msg.data != null ? messageIdToString(SHA256.default.digest(msg.data)) : '0'
msg.data != null ? messageIdToString(digest(msg.data)) : '0'
Loading