Skip to content

Commit fe90423

Browse files
Refactor TxBroadcaster (#3377)
* refactor TxBroadcaster * update tests * fix tests * no more FutureNonceRetention * add hashes request limit * refactor txs request * cleaning * add TxPool test * TransactionsMessage limit tests * add Eth65ProtocolHandlerTests and NewPooledTransactionHashesMessage limit tests * small refactor * add proper metrics * fix metrics * one more metric * small refactor * cosmetic * decrease max hashes count in one message to 3200 (as in geth) * refactor NewPooledTransactionHashesMessage * lazy enumeration instead of allocating arrays * fix NullReferenceException * make tests green * small refactoring * Revert launchSettings.json change Co-authored-by: lukasz.rozmej <[email protected]>
1 parent 2c083ef commit fe90423

24 files changed

+331
-145
lines changed

src/Nethermind/Nethermind.Network.Test/P2P/Subprotocols/Eth/V62/Eth62ProtocolHandlerTests.cs

+37
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,43 @@ static void HandlerOnSubprotocolRequested(object sender, ProtocolEventArgs e) {
346346
_handler.SubprotocolRequested += HandlerOnSubprotocolRequested;
347347
_handler.SubprotocolRequested -= HandlerOnSubprotocolRequested;
348348
}
349+
350+
[TestCase(1)]
351+
[TestCase(255)]
352+
[TestCase(256)]
353+
public void should_send_up_to_256_txs_in_one_TransactionsMessage(int txCount)
354+
{
355+
Transaction[] txs = new Transaction[txCount];
356+
357+
for (int i = 0; i < txCount; i++)
358+
{
359+
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
360+
}
361+
362+
_handler.SendNewTransactions(txs);
363+
364+
_session.Received(1).DeliverMessage(Arg.Is<TransactionsMessage>(m => m.Transactions.Count == txCount));
365+
}
366+
367+
[TestCase(257)]
368+
[TestCase(300)]
369+
[TestCase(1500)]
370+
[TestCase(10000)]
371+
public void should_send_more_than_256_txs_in_more_than_one_TransactionsMessage(int txCount)
372+
{
373+
int messagesCount = txCount / 256 + 1;
374+
int nonFullMsgTxsCount = txCount % 256;
375+
Transaction[] txs = new Transaction[txCount];
376+
377+
for (int i = 0; i < txCount; i++)
378+
{
379+
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
380+
}
381+
382+
_handler.SendNewTransactions(txs);
383+
384+
_session.Received(messagesCount).DeliverMessage(Arg.Is<TransactionsMessage>(m => m.Transactions.Count == 256 || m.Transactions.Count == nonFullMsgTxsCount));
385+
}
349386

350387
private void HandleZeroMessage<T>(T msg, int messageCode) where T : MessageBase
351388
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright (c) 2021 Demerzel Solutions Limited
2+
// This file is part of the Nethermind library.
3+
//
4+
// The Nethermind library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The Nethermind library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
16+
//
17+
18+
using System.Net;
19+
using FluentAssertions;
20+
using Nethermind.Core;
21+
using Nethermind.Core.Specs;
22+
using Nethermind.Core.Test.Builders;
23+
using Nethermind.Core.Timers;
24+
using Nethermind.Logging;
25+
using Nethermind.Network.P2P;
26+
using Nethermind.Network.P2P.Subprotocols.Eth.V65;
27+
using Nethermind.Network.Test.Builders;
28+
using Nethermind.Stats;
29+
using Nethermind.Stats.Model;
30+
using Nethermind.Synchronization;
31+
using Nethermind.TxPool;
32+
using NSubstitute;
33+
using NUnit.Framework;
34+
35+
namespace Nethermind.Network.Test.P2P.Subprotocols.Eth.V65
36+
{
37+
[TestFixture, Parallelizable(ParallelScope.Self)]
38+
public class Eth65ProtocolHandlerTests
39+
{
40+
private ISession _session;
41+
private IMessageSerializationService _svc;
42+
private ISyncServer _syncManager;
43+
private ITxPool _transactionPool;
44+
private IPooledTxsRequestor _pooledTxsRequestor;
45+
private ISpecProvider _specProvider;
46+
private Block _genesisBlock;
47+
private Eth65ProtocolHandler _handler;
48+
49+
[SetUp]
50+
public void Setup()
51+
{
52+
_svc = Build.A.SerializationService().WithEth65().TestObject;
53+
54+
NetworkDiagTracer.IsEnabled = true;
55+
56+
_session = Substitute.For<ISession>();
57+
Node node = new Node(TestItem.PublicKeyA, new IPEndPoint(IPAddress.Broadcast, 30303));
58+
_session.Node.Returns(node);
59+
_syncManager = Substitute.For<ISyncServer>();
60+
_transactionPool = Substitute.For<ITxPool>();
61+
_pooledTxsRequestor = Substitute.For<IPooledTxsRequestor>();
62+
_specProvider = Substitute.For<ISpecProvider>();
63+
_genesisBlock = Build.A.Block.Genesis.TestObject;
64+
_syncManager.Head.Returns(_genesisBlock.Header);
65+
_syncManager.Genesis.Returns(_genesisBlock.Header);
66+
ITimerFactory timerFactory = Substitute.For<ITimerFactory>();
67+
_handler = new Eth65ProtocolHandler(
68+
_session,
69+
_svc,
70+
new NodeStatsManager(timerFactory, LimboLogs.Instance),
71+
_syncManager,
72+
_transactionPool,
73+
_pooledTxsRequestor,
74+
_specProvider,
75+
LimboLogs.Instance);
76+
_handler.Init();
77+
}
78+
79+
[TearDown]
80+
public void TearDown()
81+
{
82+
_handler.Dispose();
83+
}
84+
85+
[Test]
86+
public void Metadata_correct()
87+
{
88+
_handler.ProtocolCode.Should().Be("eth");
89+
_handler.Name.Should().Be("eth65");
90+
_handler.ProtocolVersion.Should().Be(65);
91+
_handler.MessageIdSpaceSize.Should().Be(17);
92+
_handler.IncludeInTxPool.Should().BeTrue();
93+
_handler.ClientId.Should().Be(_session.Node?.ClientId);
94+
_handler.HeadHash.Should().BeNull();
95+
_handler.HeadNumber.Should().Be(0);
96+
}
97+
98+
[TestCase(1)]
99+
[TestCase(3199)]
100+
[TestCase(3200)]
101+
public void should_send_up_to_3200_hashes_in_one_NewPooledTransactionHashesMessage(int txCount)
102+
{
103+
Transaction[] txs = new Transaction[txCount];
104+
105+
for (int i = 0; i < txCount; i++)
106+
{
107+
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
108+
}
109+
110+
_handler.SendNewTransactions(txs);
111+
112+
_session.Received(1).DeliverMessage(Arg.Is<NewPooledTransactionHashesMessage>(m => m.Hashes.Count == txCount));
113+
}
114+
115+
[TestCase(3201)]
116+
[TestCase(10000)]
117+
[TestCase(20000)]
118+
public void should_send_more_than_3200_hashes_in_more_than_one_NewPooledTransactionHashesMessage(int txCount)
119+
{
120+
const int maxHashesCount = 3200;
121+
int messagesCount = txCount / maxHashesCount + 1;
122+
int nonFullMsgTxsCount = txCount % maxHashesCount;
123+
Transaction[] txs = new Transaction[txCount];
124+
125+
for (int i = 0; i < txCount; i++)
126+
{
127+
txs[i] = Build.A.Transaction.SignedAndResolved().TestObject;
128+
}
129+
130+
_handler.SendNewTransactions(txs);
131+
132+
_session.Received(messagesCount).DeliverMessage(Arg.Is<NewPooledTransactionHashesMessage>(m => m.Hashes.Count == maxHashesCount || m.Hashes.Count == nonFullMsgTxsCount));
133+
}
134+
}
135+
}

src/Nethermind/Nethermind.Network/Metrics.cs

+6
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ public static class Metrics
163163

164164
[Description("Number of eth.65 NewPooledTransactionHashes messages received")]
165165
public static long Eth65NewPooledTransactionHashesReceived { get; set; }
166+
167+
[Description("Number of eth.65 NewPooledTransactionHashes messages sent")]
168+
public static long Eth65NewPooledTransactionHashesSent { get; set; }
166169

167170
[Description("Number of eth.65 GetPooledTransactions messages received")]
168171
public static long Eth65GetPooledTransactionsReceived { get; set; }
@@ -197,6 +200,9 @@ public static class Metrics
197200
[Description("Number of eth.66 Receipts messages received")]
198201
public static long Eth66ReceiptsReceived { get; set; }
199202

203+
[Description("Number of eth.66 GetPooledTransactions messages received")]
204+
public static long Eth66GetPooledTransactionsReceived { get; set; }
205+
200206
[Description("Number of eth.66 GetPooledTransactions messages sent")]
201207
public static long Eth66GetPooledTransactionsRequested { get; set; }
202208

src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/Eth65ProtocolHandler.cs

+28-10
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
using System.Diagnostics;
2020
using System.Linq;
2121
using Nethermind.Core;
22+
using Nethermind.Core.Crypto;
2223
using Nethermind.Core.Specs;
2324
using Nethermind.Logging;
2425
using Nethermind.Network.P2P.Subprotocols.Eth.V64;
@@ -120,21 +121,38 @@ protected PooledTransactionsMessage FulfillPooledTransactionsRequest(
120121

121122
return new PooledTransactionsMessage(txs);
122123
}
123-
124-
public override bool SendNewTransaction(Transaction transaction, bool isPriority)
124+
125+
public override void SendNewTransactions(IEnumerable<Transaction> txs)
125126
{
126-
if (isPriority)
127+
const int maxCapacity = 3200;
128+
List<Keccak> hashes = new(maxCapacity);
129+
130+
foreach (Transaction tx in txs)
127131
{
128-
base.SendNewTransaction(transaction, true);
132+
if (hashes.Count == maxCapacity)
133+
{
134+
SendMessage(hashes);
135+
hashes = new(maxCapacity);
136+
}
137+
138+
if (tx.Hash is not null)
139+
{
140+
hashes.Add(tx.Hash);
141+
TxPool.Metrics.PendingTransactionsHashesSent++;
142+
}
129143
}
130-
else
144+
145+
if (hashes.Count > 0)
131146
{
132-
Counter++;
133-
NewPooledTransactionHashesMessage msg = new(new[] {transaction.Hash});
134-
Send(msg);
147+
SendMessage(hashes);
135148
}
136-
137-
return true;
149+
}
150+
151+
private void SendMessage(IList<Keccak> hashes)
152+
{
153+
NewPooledTransactionHashesMessage msg = new(hashes);
154+
Send(msg);
155+
Metrics.Eth65NewPooledTransactionHashesSent++;
138156
}
139157
}
140158
}

src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/NewPooledTransactionHashesMessage.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// You should have received a copy of the GNU Lesser General Public License
1515
// along with the Nethermind. If not, see <http://www.gnu.org/licenses/>.
1616

17+
using System.Collections.Generic;
1718
using Nethermind.Core.Crypto;
1819

1920
namespace Nethermind.Network.P2P.Subprotocols.Eth.V65
@@ -23,7 +24,7 @@ public class NewPooledTransactionHashesMessage : HashesMessage
2324
public override int PacketType { get; } = Eth65MessageCode.NewPooledTransactionHashes;
2425
public override string Protocol { get; } = "eth";
2526

26-
public NewPooledTransactionHashesMessage(Keccak[] hashes)
27+
public NewPooledTransactionHashesMessage(IList<Keccak> hashes)
2728
: base(hashes)
2829
{
2930
}

src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V65/PooledTxsRequestor.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void RequestTransactionsEth66(Action<Eth.V66.GetPooledTransactionsMessage
5454
{
5555
GetPooledTransactionsMessage msg65 = new GetPooledTransactionsMessage(discoveredTxHashes.ToArray());
5656
send(new V66.GetPooledTransactionsMessage() {EthMessage = msg65});
57-
Metrics.Eth65GetPooledTransactionsRequested++;
57+
Metrics.Eth66GetPooledTransactionsRequested++;
5858
}
5959
}
6060

src/Nethermind/Nethermind.Network/P2P/Subprotocols/Eth/V66/Eth66ProtocolHandler.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ GetBlockHeadersMessage getBlockHeadersMessage
9797
case Eth66MessageCode.GetPooledTransactions:
9898
GetPooledTransactionsMessage getPooledTxMsg
9999
= Deserialize<GetPooledTransactionsMessage>(message.Content);
100-
Metrics.Eth66GetPooledTransactionsRequested++;
100+
Metrics.Eth66GetPooledTransactionsReceived++;
101101
ReportIn(getPooledTxMsg);
102102
Handle(getPooledTxMsg);
103103
break;

src/Nethermind/Nethermind.Network/P2P/SyncPeerProtocolHandlerBase.cs

+26-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
using System.Diagnostics;
2020
using System.Threading;
2121
using System.Threading.Tasks;
22-
using DotNetty.Buffers;
2322
using Nethermind.Blockchain;
2423
using Nethermind.Blockchain.Synchronization;
2524
using Nethermind.Core;
@@ -30,7 +29,6 @@
3029
using Nethermind.Logging;
3130
using Nethermind.Network.P2P.Subprotocols.Eth.V62;
3231
using Nethermind.Network.P2P.Subprotocols.Eth.V63;
33-
using Nethermind.Network.P2P.Subprotocols.Wit;
3432
using Nethermind.Network.Rlpx;
3533
using Nethermind.Stats;
3634
using Nethermind.Stats.Model;
@@ -217,16 +215,36 @@ public virtual Task<byte[][]> GetNodeData(IList<Keccak> hashes, CancellationToke
217215

218216
public abstract void NotifyOfNewBlock(Block block, SendBlockPriority priority);
219217

220-
public virtual bool SendNewTransaction(Transaction transaction, bool isPriority)
218+
public virtual void SendNewTransactions(IEnumerable<Transaction> txs)
221219
{
222-
if (transaction.Hash == null)
220+
const int maxCapacity = 256;
221+
List<Transaction> txsToSend = new(maxCapacity);
222+
223+
foreach (Transaction tx in txs)
223224
{
224-
throw new InvalidOperationException("Trying to send a transaction with null hash");
225+
if (txsToSend.Count == maxCapacity)
226+
{
227+
SendMessage(txsToSend);
228+
txsToSend.Clear();
229+
}
230+
231+
if (tx.Hash is not null)
232+
{
233+
txsToSend.Add(tx);
234+
TxPool.Metrics.PendingTransactionsSent++;
235+
}
225236
}
226-
227-
TransactionsMessage msg = new(new[] {transaction});
237+
238+
if (txsToSend.Count > 0)
239+
{
240+
SendMessage(txsToSend);
241+
}
242+
}
243+
244+
private void SendMessage(IList<Transaction> txsToSend)
245+
{
246+
TransactionsMessage msg = new(txsToSend);
228247
Send(msg);
229-
return true;
230248
}
231249

232250
public override void HandleMessage(Packet message)

src/Nethermind/Nethermind.Synchronization.Test/BlockDownloaderTests.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)
565565

566566
public PublicKey Id => Node.Id;
567567

568-
public bool SendNewTransaction(Transaction transaction, bool isPriority)
568+
public void SendNewTransactions(IEnumerable<Transaction> txs)
569569
{
570570
throw new NotImplementedException();
571571
}
@@ -999,7 +999,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)
999999

10001000
public PublicKey Id => Node.Id;
10011001

1002-
public bool SendNewTransaction(Transaction transaction, bool isPriority)
1002+
public void SendNewTransactions(IEnumerable<Transaction> txs)
10031003
{
10041004
throw new NotImplementedException();
10051005
}

src/Nethermind/Nethermind.Synchronization.Test/FastSync/StateSyncFeedTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)
240240

241241
public PublicKey Id => Node.Id;
242242

243-
public bool SendNewTransaction(Transaction transaction, bool isPriority)
243+
public void SendNewTransactions(IEnumerable<Transaction> txs)
244244
{
245245
throw new NotImplementedException();
246246
}

src/Nethermind/Nethermind.Synchronization.Test/LatencySyncPeerMock.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public void NotifyOfNewBlock(Block block, SendBlockPriority priority)
9999

100100
public PublicKey Id => Node.Id;
101101

102-
public bool SendNewTransaction(Transaction transaction, bool isPriority)
102+
public void SendNewTransactions(IEnumerable<Transaction> txs)
103103
{
104104
throw new NotImplementedException();
105105
}

src/Nethermind/Nethermind.Synchronization.Test/SyncPeerMock.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ public void HintNewBlock(Keccak blockHash, long number)
165165
}
166166

167167
public PublicKey Id => Node.Id;
168-
169-
public bool SendNewTransaction(Transaction transaction, bool isPriority) => true;
168+
169+
public void SendNewTransactions(IEnumerable<Transaction> txs) { }
170170

171171
public Task<TxReceipt[][]> GetReceipts(IList<Keccak> blockHash, CancellationToken token)
172172
{

0 commit comments

Comments
 (0)