2
2
package decision
3
3
4
4
import (
5
+ "cmp"
5
6
"context"
7
+ "errors"
6
8
"fmt"
9
+ "slices"
7
10
"sync"
8
11
"time"
9
12
10
13
"github.com/google/uuid"
11
-
12
14
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
13
15
"github.com/ipfs/boxo/bitswap/internal/defaults"
14
16
bsmsg "github.com/ipfs/boxo/bitswap/message"
@@ -131,7 +133,7 @@ type PeerEntry struct {
131
133
// PeerLedger is an external ledger dealing with peers and their want lists.
132
134
type PeerLedger interface {
133
135
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
134
- Wants (p peer.ID , e wl.Entry )
136
+ Wants (p peer.ID , e wl.Entry , limit int ) bool
135
137
136
138
// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
137
139
CancelWant (p peer.ID , k cid.Cid ) bool
@@ -675,14 +677,12 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
675
677
return false
676
678
}
677
679
678
- newWorkExists := false
679
- defer func () {
680
- if newWorkExists {
681
- e .signalNewWork ()
682
- }
683
- }()
684
-
685
- wants , cancels , denials := e .splitWantsCancelsDenials (p , m )
680
+ wants , cancels , denials , err := e .splitWantsCancelsDenials (p , m )
681
+ if err != nil {
682
+ // This is a truely broken client, let's kill the connection.
683
+ log .Warnw (err .Error (), "local" , e .self , "remote" , p )
684
+ return true
685
+ }
686
686
687
687
// Get block sizes
688
688
wantKs := cid .NewSet ()
@@ -701,90 +701,59 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
701
701
e .peerLedger .ClearPeerWantlist (p )
702
702
}
703
703
704
+ var overflow []bsmsg.Entry
704
705
if len (wants ) != 0 {
705
706
filteredWants := wants [:0 ] // shift inplace
706
707
for _ , entry := range wants {
707
- if entry .Cid .Prefix ().MhType == mh .IDENTITY {
708
- // This is a truely broken client, let's kill the connection.
709
- e .lock .Unlock ()
710
- log .Warnw ("peer wants an identity CID" , "local" , e .self , "remote" , p )
711
- return true
712
- }
713
- if e .maxCidSize != 0 && uint (entry .Cid .ByteLen ()) > e .maxCidSize {
714
- // Ignore requests about CIDs that big.
708
+ if ! e .peerLedger .Wants (p , entry .Entry , int (e .maxQueuedWantlistEntriesPerPeer )) {
709
+ // Cannot add entry because it would exceed size limit.
710
+ overflow = append (overflow , entry )
715
711
continue
716
712
}
717
713
filteredWants = append (filteredWants , entry )
718
- if len (filteredWants ) == int (e .maxQueuedWantlistEntriesPerPeer ) {
719
- // filteredWants at limit, ignore remaining wants from request.
720
- log .Debugw ("requested wants exceeds max wantlist size" , "local" , e .self , "remote" , p , "ignoring" , len (wants )- len (filteredWants ))
721
- break
722
- }
723
- }
724
- wants = wants [len (filteredWants ):]
725
- for i := range wants {
726
- wants [i ] = bsmsg.Entry {} // early GC
727
714
}
715
+ // Clear truncated entries - early GC.
716
+ clear (wants [len (filteredWants ):])
728
717
wants = filteredWants
718
+ }
729
719
730
- // Ensure sufficient space for new wants.
731
- s := e .peerLedger .WantlistSizeForPeer (p )
732
- available := int (e .maxQueuedWantlistEntriesPerPeer ) - s
733
- if len (wants ) > available {
734
- needSpace := len (wants ) - available
735
- log .Debugw ("wantlist overflow" , "local" , e .self , "remote" , p , "would be" , s + len (wants ), "canceling" , needSpace )
736
- // Cancel any wants that are being requested again. This makes room
737
- // for new wants and minimizes that existing wants to cancel that
738
- // are not in the new request.
739
- for _ , entry := range wants {
740
- if e .peerLedger .CancelWant (p , entry .Cid ) {
741
- e .peerRequestQueue .Remove (entry .Cid , p )
742
- needSpace --
743
- if needSpace == 0 {
744
- break
745
- }
746
- }
720
+ if len (overflow ) != 0 {
721
+ // Sort wl and overflow from least to most important.
722
+ peerWants := e .peerLedger .WantlistForPeer (p )
723
+ slices .SortFunc (peerWants , func (a , b wl.Entry ) int {
724
+ return cmp .Compare (a .Priority , b .Priority )
725
+ })
726
+ slices .SortFunc (overflow , func (a , b bsmsg.Entry ) int {
727
+ return cmp .Compare (a .Entry .Priority , b .Entry .Priority )
728
+ })
729
+
730
+ // Put overflow wants onto the request queue by replacing entries that
731
+ // have the same or lower priority.
732
+ var replace int
733
+ for _ , entry := range overflow {
734
+ if entry .Entry .Priority <= peerWants [replace ].Priority {
735
+ // Everything in peerWants is equal or more improtant, so this
736
+ // overflow entry cannot replace any existing wants.
737
+ continue
747
738
}
748
- // Cancel additional wants, that are not being replaced, to make
749
- // room for new wants.
750
- if needSpace != 0 {
751
- wl := e .peerLedger .WantlistForPeer (p )
752
- for i := range wl {
753
- entCid := wl [i ].Cid
754
- if e .peerLedger .CancelWant (p , entCid ) {
755
- e .peerRequestQueue .Remove (entCid , p )
756
- needSpace --
757
- if needSpace == 0 {
758
- break
759
- }
760
- }
761
- }
739
+ entCid := peerWants [replace ].Cid
740
+ replace ++
741
+ if e .peerLedger .CancelWant (p , entCid ) {
742
+ e .peerRequestQueue .Remove (entCid , p )
762
743
}
763
- }
764
-
765
- for _ , entry := range wants {
766
- e .peerLedger .Wants (p , entry .Entry )
744
+ e .peerLedger .Wants (p , entry .Entry , int (e .maxQueuedWantlistEntriesPerPeer ))
745
+ wants = append (wants , entry )
767
746
}
768
747
}
769
748
770
749
for _ , entry := range cancels {
771
750
c := entry .Cid
772
- if c .Prefix ().MhType == mh .IDENTITY {
773
- // This is a truely broken client, let's kill the connection.
774
- e .lock .Unlock ()
775
- log .Warnw ("peer canceled an identity CID" , "local" , e .self , "remote" , p )
776
- return true
777
- }
778
- if e .maxCidSize != 0 && uint (c .ByteLen ()) > e .maxCidSize {
779
- // Ignore requests about CIDs that big.
780
- continue
781
- }
782
-
783
751
log .Debugw ("Bitswap engine <- cancel" , "local" , e .self , "from" , p , "cid" , c )
784
752
if e .peerLedger .CancelWant (p , c ) {
785
753
e .peerRequestQueue .Remove (c , p )
786
754
}
787
755
}
756
+
788
757
e .lock .Unlock ()
789
758
790
759
var activeEntries []peertask.Task
@@ -795,7 +764,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
795
764
if e .sendDontHaves && entry .SendDontHave {
796
765
c := entry .Cid
797
766
798
- newWorkExists = true
799
767
isWantBlock := false
800
768
if entry .WantType == pb .Message_Wantlist_Block {
801
769
isWantBlock = true
@@ -833,8 +801,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
833
801
continue
834
802
}
835
803
// The block was found, add it to the queue
836
- newWorkExists = true
837
-
838
804
isWantBlock := e .sendAsBlock (entry .WantType , blockSize )
839
805
840
806
log .Debugw ("Bitswap engine: block found" , "local" , e .self , "from" , p , "cid" , c , "isWantBlock" , isWantBlock )
@@ -860,19 +826,64 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
860
826
})
861
827
}
862
828
863
- // Push entries onto the request queue
864
- if len (activeEntries ) > 0 {
829
+ // Push entries onto the request queue and signal network that new work is ready.
830
+ if len (activeEntries ) != 0 {
865
831
e .peerRequestQueue .PushTasksTruncated (e .maxQueuedWantlistEntriesPerPeer , p , activeEntries ... )
866
832
e .updateMetrics ()
833
+ e .signalNewWork ()
867
834
}
868
835
return false
869
836
}
870
837
838
+ /*
839
+
840
+ // Ensure sufficient space for new wants.
841
+ s := e.peerLedger.WantlistSizeForPeer(p)
842
+ available := int(e.maxQueuedWantlistEntriesPerPeer) - s
843
+ if len(wants) > available {
844
+ needSpace := len(wants) - available
845
+ log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace)
846
+ // Cancel any wants that are being requested again. This makes room
847
+ // for new wants and minimizes that existing wants to cancel that
848
+ // are not in the new request.
849
+ for _, entry := range wants {
850
+ if e.peerLedger.CancelWant(p, entry.Cid) {
851
+ e.peerRequestQueue.Remove(entry.Cid, p)
852
+ needSpace--
853
+ if needSpace == 0 {
854
+ break
855
+ }
856
+ }
857
+ }
858
+ // Cancel additional wants, that are not being replaced, to make
859
+ // room for new wants.
860
+ if needSpace != 0 {
861
+ wl := e.peerLedger.WantlistForPeer(p)
862
+ for i := range wl {
863
+ entCid := wl[i].Cid
864
+ if e.peerLedger.CancelWant(p, entCid) {
865
+ e.peerRequestQueue.Remove(entCid, p)
866
+ needSpace--
867
+ if needSpace == 0 {
868
+ break
869
+ }
870
+ }
871
+ }
872
+ }
873
+ }
874
+
875
+ for _, entry := range wants {
876
+ e.peerLedger.Wants(p, entry.Entry)
877
+ }
878
+ }
879
+
880
+ */
881
+
871
882
// Split the want-havek entries from the cancel and deny entries.
872
- func (e * Engine ) splitWantsCancelsDenials (p peer.ID , m bsmsg.BitSwapMessage ) ([]bsmsg.Entry , []bsmsg.Entry , []bsmsg.Entry ) {
883
+ func (e * Engine ) splitWantsCancelsDenials (p peer.ID , m bsmsg.BitSwapMessage ) ([]bsmsg.Entry , []bsmsg.Entry , []bsmsg.Entry , error ) {
873
884
entries := m .Wantlist () // creates copy; safe to modify
874
885
if len (entries ) == 0 {
875
- return nil , nil , nil
886
+ return nil , nil , nil , nil
876
887
}
877
888
878
889
log .Debugw ("Bitswap engine <- msg" , "local" , e .self , "from" , p , "entryCount" , len (entries ))
@@ -881,18 +892,27 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
881
892
var cancels , denials []bsmsg.Entry
882
893
883
894
for _ , et := range entries {
895
+ c := et .Cid
896
+ if e .maxCidSize != 0 && uint (c .ByteLen ()) > e .maxCidSize {
897
+ // Ignore requests about CIDs that big.
898
+ continue
899
+ }
900
+ if c .Prefix ().MhType == mh .IDENTITY {
901
+ return nil , nil , nil , errors .New ("peer canceled an identity CID" )
902
+ }
903
+
884
904
if et .Cancel {
885
905
cancels = append (cancels , et )
886
906
continue
887
907
}
888
908
889
909
if et .WantType == pb .Message_Wantlist_Have {
890
- log .Debugw ("Bitswap engine <- want-have" , "local" , e .self , "from" , p , "cid" , et . Cid )
910
+ log .Debugw ("Bitswap engine <- want-have" , "local" , e .self , "from" , p , "cid" , c )
891
911
} else {
892
- log .Debugw ("Bitswap engine <- want-block" , "local" , e .self , "from" , p , "cid" , et . Cid )
912
+ log .Debugw ("Bitswap engine <- want-block" , "local" , e .self , "from" , p , "cid" , c )
893
913
}
894
914
895
- if e .peerBlockRequestFilter != nil && ! e .peerBlockRequestFilter (p , et . Cid ) {
915
+ if e .peerBlockRequestFilter != nil && ! e .peerBlockRequestFilter (p , c ) {
896
916
denials = append (denials , et )
897
917
continue
898
918
}
@@ -904,10 +924,19 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
904
924
wants = nil
905
925
}
906
926
927
+ // Do not take more wants that can be handled.
928
+ if len (wants ) > int (e .maxQueuedWantlistEntriesPerPeer ) {
929
+ // Keep the highest priority wants.
930
+ slices .SortFunc (wants , func (a , b bsmsg.Entry ) int {
931
+ return cmp .Compare (b .Entry .Priority , a .Entry .Priority )
932
+ })
933
+ wants = wants [:int (e .maxQueuedWantlistEntriesPerPeer )]
934
+ }
935
+
907
936
// Clear truncated entries.
908
937
clear (entries [len (wants ):])
909
938
910
- return wants , cancels , denials
939
+ return wants , cancels , denials , nil
911
940
}
912
941
913
942
// ReceivedBlocks is called when new blocks are received from the network.
0 commit comments