Skip to content

Commit 38c2811

Browse files
committed
fix: if we do not send a message to a peer, remove them from the recipients list
1 parent 15fbfcb commit 38c2811

File tree

3 files changed

+17
-11
lines changed

3 files changed

+17
-11
lines changed

test/go-gossipsub.spec.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
269269
sendRecv.push(results)
270270
}
271271
await Promise.all(sendRecv)
272-
273272
})
274273

275274
it('test gossipsub fanout maintenance', async function () {
@@ -332,7 +331,6 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
332331
sendRecv = []
333332
await sendMessages(2)
334333
await Promise.all(sendRecv)
335-
336334
})
337335

338336
it('test gossipsub fanout expiry', async function () {
@@ -381,8 +379,6 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
381379
await pWaitFor(async () => {
382380
return (psubs[0].getPubSub() as GossipSub).fanout.size === 0
383381
})
384-
385-
386382
})
387383

388384
it('test gossipsub gossip', async function () {
@@ -421,7 +417,6 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
421417
}
422418
// and wait for some gossip flushing
423419
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.getPubSub(), 'gossipsub:heartbeat', 2)))
424-
425420
})
426421

427422
it('test gossipsub gossip propagation', async function () {

test/gossip.spec.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,9 @@ describe('gossip', () => {
111111
// should have peerB as a subscriber to the topic
112112
expect(nodeA.getPubSub().getSubscribers(topic).map(p => p.toString())).to.include(peerB, 'did not know about peerB\'s subscription to topic')
113113

114+
// should be able to send them messages
115+
expect((nodeA.getPubSub() as GossipSub).peers.get(peerB)).to.have.property('isWritable', true, 'nodeA did not have connection open to peerB')
116+
114117
// set spy. NOTE: Forcing private property to be public
115118
const nodeASpy = sinon.spy(nodeA.getPubSub() as GossipSub, 'piggybackControl')
116119

@@ -121,7 +124,7 @@ describe('gossip', () => {
121124
const publishResult = await nodeA.getPubSub().publish(topic, uint8ArrayFromString('hey'))
122125

123126
// should have sent message to peerB
124-
expect(publishResult.recipients.map(p => p.toString())).to.include(peerB, 'did not sent pubsub message to peerB')
127+
expect(publishResult.recipients.map(p => p.toString())).to.include(peerB, 'did not send pubsub message to peerB')
125128

126129
expect(nodeASpy.callCount).to.be.equal(1)
127130
// expect control message to be sent alongside published message

ts/index.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,10 +1875,16 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
18751875

18761876
// Send to set of peers aggregated from direct, mesh, fanout
18771877
const rpc = createGossipRpc([rawMsg])
1878-
tosend.forEach((id) => {
1878+
1879+
for (const id of tosend) {
18791880
// self.send_message(*peer_id, event.clone())?;
1880-
this.sendRpc(id, rpc)
1881-
})
1881+
const sent = this.sendRpc(id, rpc)
1882+
1883+
// did not actually send the message
1884+
if (!sent) {
1885+
tosend.delete(id)
1886+
}
1887+
}
18821888

18831889
this.metrics?.onPublishMsg(topic, tosendCount, tosend.size, rawMsg.data != null ? rawMsg.data.length : 0)
18841890

@@ -1990,11 +1996,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
19901996
/**
19911997
* Send an rpc object to a peer
19921998
*/
1993-
private sendRpc(id: PeerIdStr, rpc: RPC): void {
1999+
private sendRpc(id: PeerIdStr, rpc: RPC): boolean {
19942000
const peerStreams = this.peers.get(id)
19952001
if (!peerStreams || !peerStreams.isWritable) {
19962002
this.log(`Cannot send RPC to ${id} as there is no open stream to it available`)
1997-
return
2003+
return false
19982004
}
19992005

20002006
// piggyback control message retries
@@ -2015,6 +2021,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
20152021
peerStreams.write(rpcBytes)
20162022

20172023
this.metrics?.onRpcSent(rpc, rpcBytes.length)
2024+
2025+
return true
20182026
}
20192027

20202028
public piggybackControl(id: PeerIdStr, outRpc: RPC, ctrl: RPC.ControlMessage): void {

0 commit comments

Comments
 (0)