diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f98ac052..da94b7837 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The following emojis are used to highlight certain changes: - `gateway` Support for custom DNSLink / DoH resolvers on `localhost` to simplify integration with non-ICANN DNS systems [#645](https://github.com/ipfs/boxo/pull/645) ### Changed +- Do not send CANCEL to peer that block was received from, as this is redundant. [#784](https://github.com/ipfs/boxo/pull/784) - `gateway` The default DNSLink resolver for `.eth` TLD changed to `https://dns.eth.limo/dns-query` [#781](https://github.com/ipfs/boxo/pull/781) - `gateway` The default DNSLink resolver for `.crypto` TLD changed to `https://resolver.unstoppable.io/dns-query` [#782](https://github.com/ipfs/boxo/pull/782) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 4634ff164..cd0069bea 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci // SendCancels sends cancels for the given keys to all peers who had previously // received a want for those keys. -func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { +func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) { pm.pqLk.Lock() defer pm.pqLk.Unlock() // Send a CANCEL to each peer that has been sent a want-block or want-have - pm.pwm.sendCancels(cancelKs) + pm.pwm.sendCancels(cancelKs, excludePeer) } // CurrentWants returns the list of pending wants (both want-haves and want-blocks). diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b778c46e3..61226acda 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) { collectMessages(msgs, 2*time.Millisecond) // Send cancels for 1 want-block and 1 want-have - peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}) + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "") collected := collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) { } // Send cancels for all cids - peerManager.SendCancels(ctx, cids) + peerManager.SendCancels(ctx, cids, "") collected = collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) { } } +func TestSendCancelsExclude(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msgs := make(chan msg, 16) + peerQueueFactory := makePeerQueueFactory(msgs) + tp := random.Peers(3) + self, peer1, peer2 := tp[0], tp[1], tp[2] + peerManager := New(ctx, peerQueueFactory, self) + cids := random.Cids(4) + + // Connect to peer1 and peer2 + peerManager.Connected(peer1) + peerManager.Connected(peer2) + + // Send 2 want-blocks and 1 want-have to peer1 + peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]}) + + // Clear messages + collectMessages(msgs, 2*time.Millisecond) + + // Send cancels for 1 want-block and 1 want-have + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1) + collected := collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 0 { + t.Fatal("Expected no cancels to be sent to excluded peer") + } + + // Send cancels for all cids + peerManager.SendCancels(ctx, cids, "") + collected = collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 3 { + t.Fatal("Expected cancel to be sent for want-blocks") + } +} + func (s *sess) ID() uint64 { return s.id } @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) { limit := len(wanted) / 10 cancel := wanted[:limit] wanted = wanted[limit:] - peerManager.SendCancels(ctx, cancel) + peerManager.SendCancels(ctx, cancel, "") } } } diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index e9fdfbb46..765566155 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // sendCancels sends a cancel to each peer to which a corresponding want was // sent -func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { +func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) { if len(cancelKs) == 0 { return } @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { cancelPeers[p] = struct{}{} } } + delete(cancelPeers, excludePeer) for p := range cancelPeers { pws, ok := pwm.peerWants[p] if !ok { diff --git a/bitswap/client/internal/peermanager/peerwantmanager_test.go b/bitswap/client/internal/peermanager/peerwantmanager_test.go index bfe0c626d..ff9b4ebad 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/client/internal/peermanager/peerwantmanager_test.go @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel 1 want-block and 1 want-have that were sent to p0 clearSent(peerQueues) - pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}) + pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "") // Should cancel the want-block and want-have require.Empty(t, pq1.cancels, "Expected no cancels sent to p1") require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled") @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel everything clearSent(peerQueues) allCids := append(allwb, allwh...) - pwm.sendCancels(allCids) + pwm.sendCancels(allCids, "") // Should cancel the remaining want-blocks and want-haves for p0 require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled") @@ -312,7 +312,7 @@ func TestStats(t *testing.T) { // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent cids5 := random.Cids(1) - pwm.sendCancels(append(cids5, cids[0])) + pwm.sendCancels(append(cids5, cids[0]), "") require.Equal(t, 7, g.count, "Expected 7 wants") require.Equal(t, 3, wbg.count, "Expected 3 want-blocks") @@ -332,7 +332,7 @@ func TestStats(t *testing.T) { require.Zero(t, wbg.count, "Expected 0 want-blocks") // Cancel one remaining broadcast want-have - pwm.sendCancels(cids2[:1]) + pwm.sendCancels(cids2[:1], "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Zero(t, wbg.count, "Expected 0 want-blocks") } @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) { require.Equal(t, 4, wbg.count, "Expected 4 want-blocks") // Cancel 1 of each group of cids - pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}) + pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Equal(t, 2, wbg.count, "Expected 2 want-blocks") diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index b19763f3c..a64b39223 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -43,7 +43,7 @@ type PeerManager interface { // session discovery) BroadcastWantHaves(context.Context, []cid.Cid) // SendCancels tells the PeerManager to send cancels to all peers - SendCancels(context.Context, []cid.Cid) + SendCancels(context.Context, []cid.Cid, peer.ID) } // SessionManager manages all the sessions diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index c0d26a91d..5b33b351a 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci case <-ctx.Done(): } } -func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {} +func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {} func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index e5589dd58..a43c8af96 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool { return false } -func (*mockPeerManager) UnregisterSession(uint64) {} -func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} +func (*mockPeerManager) UnregisterSession(uint64) {} +func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {} func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool { pm.lk.Lock() diff --git a/bitswap/client/internal/sessionmanager/sessionmanager.go b/bitswap/client/internal/sessionmanager/sessionmanager.go index 0d2b24330..179877acc 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager.go @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid } // Send CANCEL to all peers with want-have / want-block - sm.peerManager.SendCancels(ctx, blks) + sm.peerManager.SendCancels(ctx, blks, p) } // CancelSessionWants is called when a session cancels wants because a call to @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) { // Send CANCEL to all peers for blocks that no session is interested in // anymore. // Note: use bitswap context because session context may already be Done. - sm.peerManager.SendCancels(sm.ctx, wants) + sm.peerManager.SendCancels(sm.ctx, wants, "") } diff --git a/bitswap/client/internal/sessionmanager/sessionmanager_test.go b/bitswap/client/internal/sessionmanager/sessionmanager_test.go index bad26ad90..bb9ad4755 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager_test.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager_test.go @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true } func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { +func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) { fpm.lk.Lock() defer fpm.lk.Unlock() fpm.cancels = append(fpm.cancels, cancels...)