From 3ae79f9f2e0439feb1c9dc32e4576f87bd1b2551 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 8 Jan 2025 09:06:45 -1000 Subject: [PATCH] do not send CANCEL to peer we got block from The serving peer cleans the client's wantlint after serving the block, making sending CANCEL to the serving peer redundant. So, exclude the serving peer when sending cancels after receiving a block. Closes #694 --- .../internal/peermanager/peermanager.go | 4 +- .../internal/peermanager/peermanager_test.go | 49 +++++++++++++++++-- .../internal/peermanager/peerwantmanager.go | 3 +- .../peermanager/peerwantmanager_test.go | 10 ++-- bitswap/client/internal/session/session.go | 2 +- .../client/internal/session/session_test.go | 2 +- .../session/sessionwantsender_test.go | 6 +-- .../internal/sessionmanager/sessionmanager.go | 4 +- .../sessionmanager/sessionmanager_test.go | 2 +- 9 files changed, 63 insertions(+), 19 deletions(-) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index 4634ff164..cd0069bea 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -156,12 +156,12 @@ func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []ci // SendCancels sends cancels for the given keys to all peers who had previously // received a want for those keys. -func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid) { +func (pm *PeerManager) SendCancels(ctx context.Context, cancelKs []cid.Cid, excludePeer peer.ID) { pm.pqLk.Lock() defer pm.pqLk.Unlock() // Send a CANCEL to each peer that has been sent a want-block or want-have - pm.pwm.sendCancels(cancelKs) + pm.pwm.sendCancels(cancelKs, excludePeer) } // CurrentWants returns the list of pending wants (both want-haves and want-blocks). diff --git a/bitswap/client/internal/peermanager/peermanager_test.go b/bitswap/client/internal/peermanager/peermanager_test.go index b778c46e3..61226acda 100644 --- a/bitswap/client/internal/peermanager/peermanager_test.go +++ b/bitswap/client/internal/peermanager/peermanager_test.go @@ -239,7 +239,7 @@ func TestSendCancels(t *testing.T) { collectMessages(msgs, 2*time.Millisecond) // Send cancels for 1 want-block and 1 want-have - peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}) + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, "") collected := collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -250,7 +250,7 @@ func TestSendCancels(t *testing.T) { } // Send cancels for all cids - peerManager.SendCancels(ctx, cids) + peerManager.SendCancels(ctx, cids, "") collected = collectMessages(msgs, 2*time.Millisecond) if _, ok := collected[peer2]; ok { @@ -261,6 +261,49 @@ func TestSendCancels(t *testing.T) { } } +func TestSendCancelsExclude(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + msgs := make(chan msg, 16) + peerQueueFactory := makePeerQueueFactory(msgs) + tp := random.Peers(3) + self, peer1, peer2 := tp[0], tp[1], tp[2] + peerManager := New(ctx, peerQueueFactory, self) + cids := random.Cids(4) + + // Connect to peer1 and peer2 + peerManager.Connected(peer1) + peerManager.Connected(peer2) + + // Send 2 want-blocks and 1 want-have to peer1 + peerManager.SendWants(ctx, peer1, []cid.Cid{cids[0], cids[1]}, []cid.Cid{cids[2]}) + + // Clear messages + collectMessages(msgs, 2*time.Millisecond) + + // Send cancels for 1 want-block and 1 want-have + peerManager.SendCancels(ctx, []cid.Cid{cids[0], cids[2]}, peer1) + collected := collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 0 { + t.Fatal("Expected no cancels to be sent to excluded peer") + } + + // Send cancels for all cids + peerManager.SendCancels(ctx, cids, "") + collected = collectMessages(msgs, 2*time.Millisecond) + + if _, ok := collected[peer2]; ok { + t.Fatal("Expected no cancels to be sent to peer that was not sent messages") + } + if len(collected[peer1].cancels) != 3 { + t.Fatal("Expected cancel to be sent for want-blocks") + } +} + func (s *sess) ID() uint64 { return s.id } @@ -376,7 +419,7 @@ func BenchmarkPeerManager(b *testing.B) { limit := len(wanted) / 10 cancel := wanted[:limit] wanted = wanted[limit:] - peerManager.SendCancels(ctx, cancel) + peerManager.SendCancels(ctx, cancel, "") } } } diff --git a/bitswap/client/internal/peermanager/peerwantmanager.go b/bitswap/client/internal/peermanager/peerwantmanager.go index e9fdfbb46..765566155 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager.go +++ b/bitswap/client/internal/peermanager/peerwantmanager.go @@ -233,7 +233,7 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves // sendCancels sends a cancel to each peer to which a corresponding want was // sent -func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { +func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid, excludePeer peer.ID) { if len(cancelKs) == 0 { return } @@ -298,6 +298,7 @@ func (pwm *peerWantManager) sendCancels(cancelKs []cid.Cid) { cancelPeers[p] = struct{}{} } } + delete(cancelPeers, excludePeer) for p := range cancelPeers { pws, ok := pwm.peerWants[p] if !ok { diff --git a/bitswap/client/internal/peermanager/peerwantmanager_test.go b/bitswap/client/internal/peermanager/peerwantmanager_test.go index bfe0c626d..ff9b4ebad 100644 --- a/bitswap/client/internal/peermanager/peerwantmanager_test.go +++ b/bitswap/client/internal/peermanager/peerwantmanager_test.go @@ -245,7 +245,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel 1 want-block and 1 want-have that were sent to p0 clearSent(peerQueues) - pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}) + pwm.sendCancels([]cid.Cid{wb1[0], wh1[0]}, "") // Should cancel the want-block and want-have require.Empty(t, pq1.cancels, "Expected no cancels sent to p1") require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[0], wh1[0]}, "Expected 2 cids to be cancelled") @@ -255,7 +255,7 @@ func TestPWMSendCancels(t *testing.T) { // Cancel everything clearSent(peerQueues) allCids := append(allwb, allwh...) - pwm.sendCancels(allCids) + pwm.sendCancels(allCids, "") // Should cancel the remaining want-blocks and want-haves for p0 require.ElementsMatch(t, pq0.cancels, []cid.Cid{wb1[1], wh1[1]}, "Expected un-cancelled cids to be cancelled") @@ -312,7 +312,7 @@ func TestStats(t *testing.T) { // Cancel 1 want-block that was sent to p0 // and 1 want-block that was not sent cids5 := random.Cids(1) - pwm.sendCancels(append(cids5, cids[0])) + pwm.sendCancels(append(cids5, cids[0]), "") require.Equal(t, 7, g.count, "Expected 7 wants") require.Equal(t, 3, wbg.count, "Expected 3 want-blocks") @@ -332,7 +332,7 @@ func TestStats(t *testing.T) { require.Zero(t, wbg.count, "Expected 0 want-blocks") // Cancel one remaining broadcast want-have - pwm.sendCancels(cids2[:1]) + pwm.sendCancels(cids2[:1], "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Zero(t, wbg.count, "Expected 0 want-blocks") } @@ -362,7 +362,7 @@ func TestStatsOverlappingWantBlockWantHave(t *testing.T) { require.Equal(t, 4, wbg.count, "Expected 4 want-blocks") // Cancel 1 of each group of cids - pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}) + pwm.sendCancels([]cid.Cid{cids[0], cids2[0]}, "") require.Equal(t, 2, g.count, "Expected 2 wants") require.Equal(t, 2, wbg.count, "Expected 2 want-blocks") diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index b19763f3c..a64b39223 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -43,7 +43,7 @@ type PeerManager interface { // session discovery) BroadcastWantHaves(context.Context, []cid.Cid) // SendCancels tells the PeerManager to send cancels to all peers - SendCancels(context.Context, []cid.Cid) + SendCancels(context.Context, []cid.Cid, peer.ID) } // SessionManager manages all the sessions diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index c0d26a91d..5b33b351a 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -151,7 +151,7 @@ func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Ci case <-ctx.Done(): } } -func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {} +func (pm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) {} func TestSessionGetBlocks(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index e5589dd58..a43c8af96 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -78,9 +78,9 @@ func (pm *mockPeerManager) has(p peer.ID, sid uint64) bool { return false } -func (*mockPeerManager) UnregisterSession(uint64) {} -func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} +func (*mockPeerManager) UnregisterSession(uint64) {} +func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*mockPeerManager) SendCancels(context.Context, []cid.Cid, peer.ID) {} func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) bool { pm.lk.Lock() diff --git a/bitswap/client/internal/sessionmanager/sessionmanager.go b/bitswap/client/internal/sessionmanager/sessionmanager.go index 0d2b24330..179877acc 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager.go @@ -173,7 +173,7 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid } // Send CANCEL to all peers with want-have / want-block - sm.peerManager.SendCancels(ctx, blks) + sm.peerManager.SendCancels(ctx, blks, p) } // CancelSessionWants is called when a session cancels wants because a call to @@ -193,5 +193,5 @@ func (sm *SessionManager) cancelWants(wants []cid.Cid) { // Send CANCEL to all peers for blocks that no session is interested in // anymore. // Note: use bitswap context because session context may already be Done. - sm.peerManager.SendCancels(sm.ctx, wants) + sm.peerManager.SendCancels(sm.ctx, wants, "") } diff --git a/bitswap/client/internal/sessionmanager/sessionmanager_test.go b/bitswap/client/internal/sessionmanager/sessionmanager_test.go index bad26ad90..bb9ad4755 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager_test.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager_test.go @@ -70,7 +70,7 @@ func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) func (*fakePeerManager) UnregisterSession(uint64) {} func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) bool { return true } func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} -func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { +func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid, excludePeer peer.ID) { fpm.lk.Lock() defer fpm.lk.Unlock() fpm.cancels = append(fpm.cancels, cancels...)