Skip to content

Commit c61ee36

Browse files
authored
TrieStore to Channels (#7845)
1 parent e63eac0 commit c61ee36

File tree

2 files changed

+130
-27
lines changed

2 files changed

+130
-27
lines changed

src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs

+30-23
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,16 @@
33

44
using System;
55
using System.Collections.Concurrent;
6-
using System.Collections.Generic;
76
using System.Diagnostics;
87
using System.Diagnostics.CodeAnalysis;
98
using System.Linq;
109
using System.Runtime.CompilerServices;
1110
using System.Threading;
11+
using System.Threading.Channels;
1212
using System.Threading.Tasks;
1313
using Nethermind.Core;
1414
using Nethermind.Core.Collections;
15+
using Nethermind.Core.Cpu;
1516
using Nethermind.Core.Crypto;
1617
using Nethermind.Core.Extensions;
1718
using Nethermind.Logging;
@@ -32,7 +33,7 @@ public class TrieStore : ITrieStore, IPruningTrieStore
3233
private readonly Task[] _dirtyNodesTasks = [];
3334
private readonly ConcurrentDictionary<HashAndTinyPath, Hash256?>[] _persistedHashes = [];
3435
private readonly Action<TreePath, Hash256?, TrieNode> _persistedNodeRecorder;
35-
private readonly Task[] _disposeTasks = new Task[Environment.ProcessorCount];
36+
private readonly Task[] _disposeTasks = new Task[RuntimeInformation.PhysicalCoreCount];
3637

3738
// This seems to attempt prevent multiple block processing at the same time and along with pruning at the same time.
3839
private readonly object _dirtyNodesLock = new object();
@@ -790,27 +791,33 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
790791
// However, anything that we are trying to persist here should still be in dirty cache.
791792
// So parallel read should go there first instead of to the database for these dataset,
792793
// so it should be fine for these to be non atomic.
793-
using BlockingCollection<INodeStorage.WriteBatch> disposeQueue = new BlockingCollection<INodeStorage.WriteBatch>(4);
794-
795-
for (int index = 0; index < _disposeTasks.Length; index++)
794+
Task[] disposeTasks = _disposeTasks;
795+
Channel<INodeStorage.WriteBatch> disposeQueue = Channel.CreateBounded<INodeStorage.WriteBatch>(disposeTasks.Length * 2);
796+
try
796797
{
797-
_disposeTasks[index] = Task.Run(() =>
798+
for (int index = 0; index < disposeTasks.Length; index++)
798799
{
799-
while (disposeQueue.TryTake(out INodeStorage.WriteBatch disposable, Timeout.Infinite))
800+
disposeTasks[index] = Task.Run(async () =>
800801
{
801-
disposable.Dispose();
802-
}
803-
});
804-
}
802+
await foreach (IDisposable disposable in disposeQueue.Reader.ReadAllAsync())
803+
{
804+
disposable.Dispose();
805+
}
806+
});
807+
}
805808

806-
using ArrayPoolList<Task> persistNodeStartingFromTasks = parallelStartNodes.Select(
807-
entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue)))
808-
.ToPooledList(parallelStartNodes.Count);
809+
using ArrayPoolList<Task> persistNodeStartingFromTasks = parallelStartNodes.Select(
810+
entry => Task.Run(() => PersistNodeStartingFrom(entry.trieNode, entry.address2, entry.path, persistedNodeRecorder, writeFlags, disposeQueue)))
811+
.ToPooledList(parallelStartNodes.Count);
809812

810-
Task.WaitAll(persistNodeStartingFromTasks.AsSpan());
813+
Task.WaitAll(persistNodeStartingFromTasks.AsSpan());
814+
}
815+
finally
816+
{
817+
disposeQueue.Writer.Complete();
818+
}
811819

812-
disposeQueue.CompleteAdding();
813-
Task.WaitAll(_disposeTasks);
820+
Task.WaitAll(disposeTasks);
814821

815822
// Dispose top level last in case something goes wrong, at least the root won't be stored
816823
topLevelWriteBatch.Dispose();
@@ -824,28 +831,28 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path)
824831
LastPersistedBlockNumber = commitSet.BlockNumber;
825832
}
826833

827-
private void PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path,
834+
private async Task PersistNodeStartingFrom(TrieNode tn, Hash256 address2, TreePath path,
828835
Action<TreePath, Hash256?, TrieNode>? persistedNodeRecorder,
829-
WriteFlags writeFlags, BlockingCollection<INodeStorage.WriteBatch> disposeQueue)
836+
WriteFlags writeFlags, Channel<INodeStorage.WriteBatch> disposeQueue)
830837
{
831838
long persistedNodeCount = 0;
832839
INodeStorage.WriteBatch writeBatch = _nodeStorage.StartWriteBatch();
833840

834-
void DoPersist(TrieNode node, Hash256? address3, TreePath path2)
841+
async ValueTask DoPersist(TrieNode node, Hash256? address3, TreePath path2)
835842
{
836843
persistedNodeRecorder?.Invoke(path2, address3, node);
837844
PersistNode(address3, path2, node, writeBatch, writeFlags);
838845

839846
persistedNodeCount++;
840847
if (persistedNodeCount % 512 == 0)
841848
{
842-
disposeQueue.Add(writeBatch);
849+
await disposeQueue.Writer.WriteAsync(writeBatch);
843850
writeBatch = _nodeStorage.StartWriteBatch();
844851
}
845852
}
846853

847-
tn.CallRecursively(DoPersist, address2, ref path, GetTrieStore(address2), true, _logger);
848-
disposeQueue.Add(writeBatch);
854+
await tn.CallRecursivelyAsync(DoPersist, address2, ref path, GetTrieStore(address2), _logger);
855+
await disposeQueue.Writer.WriteAsync(writeBatch);
849856
}
850857

851858
private void PersistNode(Hash256? address, in TreePath path, TrieNode currentNode, INodeStorage.WriteBatch writeBatch, WriteFlags writeFlags = WriteFlags.None)

src/Nethermind/Nethermind.Trie/TrieNode.cs

+100-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Runtime.CompilerServices;
88
using System.Runtime.InteropServices;
99
using System.Threading;
10+
using System.Threading.Tasks;
1011
using Nethermind.Core;
1112
using Nethermind.Core.Buffers;
1213
using Nethermind.Core.Crypto;
@@ -896,12 +897,12 @@ public void CallRecursively(
896897

897898
if (!IsLeaf)
898899
{
899-
if (_data is not null)
900+
object?[] data = _data;
901+
if (data is not null)
900902
{
901-
for (int i = 0; i < _data.Length; i++)
903+
for (int i = 0; i < data.Length; i++)
902904
{
903-
object o = _data[i];
904-
if (o is TrieNode child)
905+
if (data[i] is TrieNode child)
905906
{
906907
if (logger.IsTrace) logger.Trace($"Persist recursively on child {i} {child} of {this}");
907908
int previousLength = AppendChildPath(ref currentPath, i);
@@ -938,6 +939,101 @@ public void CallRecursively(
938939
action(this, storageAddress, currentPath);
939940
}
940941

942+
public ValueTask CallRecursivelyAsync(
943+
Func<TrieNode, Hash256?, TreePath, ValueTask> action,
944+
Hash256? storageAddress,
945+
ref TreePath currentPath,
946+
ITrieNodeResolver resolver,
947+
ILogger logger)
948+
{
949+
if (IsPersisted)
950+
{
951+
if (logger.IsTrace) logger.Trace($"Skipping {this} - already persisted");
952+
return default;
953+
}
954+
955+
if (currentPath.Length >= Int32.MaxValue)
956+
{
957+
return action(this, storageAddress, currentPath);
958+
}
959+
960+
if (!IsLeaf)
961+
{
962+
if (_data is null)
963+
{
964+
return action(this, storageAddress, currentPath);
965+
}
966+
967+
return CallRecursivelyNotLeafAsync(
968+
action,
969+
storageAddress,
970+
currentPath,
971+
resolver,
972+
logger);
973+
}
974+
else
975+
{
976+
return CallRecursivelyLeafAsync(
977+
action,
978+
storageAddress,
979+
currentPath,
980+
resolver,
981+
logger);
982+
}
983+
}
984+
985+
private async ValueTask CallRecursivelyNotLeafAsync(
986+
Func<TrieNode, Hash256?, TreePath, ValueTask> action,
987+
Hash256? storageAddress,
988+
TreePath currentPath,
989+
ITrieNodeResolver resolver,
990+
ILogger logger)
991+
{
992+
object?[] data = _data;
993+
for (int i = 0; i < data.Length; i++)
994+
{
995+
if (data[i] is TrieNode child)
996+
{
997+
if (logger.IsTrace) logger.Trace($"Persist recursively on child {i} {child} of {this}");
998+
int previousLength = AppendChildPath(ref currentPath, i);
999+
await child.CallRecursivelyAsync(action, storageAddress, ref currentPath, resolver, logger);
1000+
currentPath.TruncateMut(previousLength);
1001+
}
1002+
}
1003+
1004+
await action(this, storageAddress, currentPath);
1005+
}
1006+
1007+
private async ValueTask CallRecursivelyLeafAsync(
1008+
Func<TrieNode, Hash256?, TreePath, ValueTask> action,
1009+
Hash256? storageAddress,
1010+
TreePath currentPath,
1011+
ITrieNodeResolver resolver,
1012+
ILogger logger)
1013+
{
1014+
TrieNode? storageRoot = _storageRoot;
1015+
if (storageRoot is not null || TryResolveStorageRoot(resolver, ref currentPath, out storageRoot))
1016+
{
1017+
if (logger.IsTrace) logger.Trace($"Persist recursively on storage root {_storageRoot} of {this}");
1018+
Hash256 storagePathAddr;
1019+
using (currentPath.ScopedAppend(Key))
1020+
{
1021+
if (currentPath.Length != 64) throw new Exception("unexpected storage path length. Total nibble count should add up to 64.");
1022+
storagePathAddr = currentPath.Path.ToCommitment();
1023+
}
1024+
1025+
TreePath emptyPath = TreePath.Empty;
1026+
await storageRoot!.CallRecursivelyAsync(
1027+
action,
1028+
storagePathAddr,
1029+
ref emptyPath,
1030+
resolver.GetStorageTrieNodeResolver(storagePathAddr),
1031+
logger);
1032+
}
1033+
1034+
await action(this, storageAddress, currentPath);
1035+
}
1036+
9411037
/// <summary>
9421038
/// Imagine a branch like this:
9431039
/// B

0 commit comments

Comments
 (0)