Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FastHeadersSyncFeed explicitly flush before setting metadata #8103

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,22 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he
return AddBlockResult.Added;
}

public void Flush(FlushReason reason)
{
switch (reason)
{
case FlushReason.InsertHeaders:
_headerStore.Flush();
_chainLevelInfoRepository.Flush();
break;
case FlushReason.InsertBlocks:
_blockStore.Flush();
break;
default:
throw new ArgumentOutOfRangeException(nameof(reason), reason, null);
}
}

public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None)
{
Expand Down
6 changes: 4 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/BlockTreeOverlay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ public AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions he
public AddBlockResult Insert(Block block,
BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None,
WriteFlags bodiesWriteFlags = WriteFlags.None) =>
_overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, bodiesWriteFlags);
WriteFlags blockWriteFlags = WriteFlags.None) =>
_overlayTree.Insert(block, insertBlockOptions, insertHeaderOptions, blockWriteFlags);

public void Flush(FlushReason reason) => _overlayTree.Flush(reason);

public void UpdateHeadBlock(Hash256 blockHash) =>
_overlayTree.UpdateHeadBlock(blockHash);
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/BlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,6 @@ public void Cache(Block block)
{
_blockCache.Set(block.Hash, block);
}

public void Flush() => blockDb.Flush();
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Blockchain/Blocks/IBlockStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public interface IBlockStore
ReceiptRecoveryBlock? GetReceiptRecoveryBlock(long blockNumber, Hash256 blockHash);
void Cache(Block block);
bool HasBlock(long blockNumber, Hash256 blockHash);
void Flush();
}
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber)
return Get(blockHash)?.Number;
}

public void Flush()
{
_headerDb.Flush();
_blockNumberDb.Flush();
}

private long? GetBlockNumberFromBlockNumberDb(Hash256 blockHash)
{
Span<byte> numberSpan = _blockNumberDb.GetSpan(blockHash);
Expand Down
5 changes: 5 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/Headers/IHeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ public interface IHeaderStore
void Delete(Hash256 blockHash);
void InsertBlockNumber(Hash256 blockHash, long blockNumber);
long? GetBlockNumber(Hash256 blockHash);

/// <summary>
/// Flushed the underlying db.
/// </summary>
void Flush();
}
36 changes: 31 additions & 5 deletions src/Nethermind/Nethermind.Blockchain/IBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,25 @@ public interface IBlockTree : IBlockFinder
/// <param name="header">Header to add</param>
/// <param name="headerOptions"></param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult Insert(BlockHeader header, BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None);
AddBlockResult Insert(BlockHeader header,
BlockTreeInsertHeaderOptions headerOptions = BlockTreeInsertHeaderOptions.None);

/// <summary>
/// Inserts a disconnected block body (not for processing).
/// </summary>
/// <param name="block">Block to add</param>
/// <param name="blockWriteFlags">The write flags overrides to be used for this insert operation.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags bodiesWriteFlags = WriteFlags.None);
AddBlockResult Insert(Block block,
BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None,
BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None,
WriteFlags blockWriteFlags = WriteFlags.None);

/// <summary>
/// Flushes underlying storages for the specific <paramref name="reason"/>.
/// </summary>
/// <param name="reason">The reason for flushing, showing what changes should be persisted.</param>
void Flush(FlushReason reason);

void UpdateHeadBlock(Hash256 blockHash);

Expand All @@ -83,15 +93,17 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption
/// <param name="block">Block to be included</param>
/// <param name="options">Options for suggesting block, whether a block should be processed or just added to the store.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
AddBlockResult SuggestBlock(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);
AddBlockResult SuggestBlock(Block block,
BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);

/// <summary>
/// Suggests block for inclusion in the block tree. Wait for DB unlock if needed.
/// </summary>
/// <param name="block">Block to be included</param>
/// <param name="options">Options for suggesting block, whether a block should be processed or just added to the store.</param>
/// <returns>Result of the operation, eg. Added, AlreadyKnown, etc.</returns>
ValueTask<AddBlockResult> SuggestBlockAsync(Block block, BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);
ValueTask<AddBlockResult> SuggestBlockAsync(Block block,
BlockTreeSuggestOptions options = BlockTreeSuggestOptions.ShouldProcess);

/// <summary>
/// Suggests a block header (without body)
Expand Down Expand Up @@ -179,4 +191,18 @@ AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOption

void RecalculateTreeLevels();
}

public enum FlushReason
{
/// <summary>
/// Flush after <see cref="IBlockTree.Insert" for a header is called/>
/// is called
/// </summary>
InsertHeaders,

/// <summary>
/// Flush after <see cref="IBlockTree.Insert"/> for a block is called.
/// </summary>
InsertBlocks,
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Blockchain/ReadOnlyBlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public async Task Accept(IBlockTreeVisitor blockTreeVisitor, CancellationToken c
public AddBlockResult Insert(Block block, BlockTreeInsertBlockOptions insertBlockOptions = BlockTreeInsertBlockOptions.None, BlockTreeInsertHeaderOptions insertHeaderOptions = BlockTreeInsertHeaderOptions.None, WriteFlags blockWriteFlags = WriteFlags.None) =>
throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");

public void Flush(FlushReason reason) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Flush)} calls");

public void Insert(IEnumerable<Block> blocks) => throw new InvalidOperationException($"{nameof(ReadOnlyBlockTree)} does not expect {nameof(Insert)} calls");

public void UpdateHeadBlock(Hash256 blockHash)
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Era1/EraImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async Task ImportBlock(long blockNumber)
private void InsertBlockAndReceipts(Block b, TxReceipt[] r, long lastBlockNumber)
{
if (blockTree.FindBlock(b.Number) is null)
blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL);
blockTree.Insert(b, BlockTreeInsertBlockOptions.SaveHeader | BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL);
if (!receiptStorage.HasBlock(b.Number, b.Hash!))
receiptStorage.Insert(b, r, true, writeFlags: WriteFlags.DisableWAL, lastBlockNumber: lastBlockNumber);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public bool HasBlock(long blockNumber, Hash256 blockHash)
{
return _blockNumDict.ContainsKey(blockNumber);
}

public void Flush()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,8 @@ public void InsertBlockNumber(Hash256 blockHash, long blockNumber)
{
return _blockNumberDict.TryGetValue(blockHash, out var blockNumber) ? blockNumber : readonlyBaseHeaderStore.GetBlockNumber(blockHash);
}

public void Flush()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,7 @@ void LocalPersistLevel()
public BatchWrite StartBatch() => new(_writeLock);

public ChainLevelInfo? LoadLevel(long number) => _blockInfoDb.Get(number, Rlp.GetStreamDecoder<ChainLevelInfo>(), _blockInfoCache);

public void Flush() => _blockInfoDb.Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ public interface IChainLevelInfoRepository
void PersistLevel(long number, ChainLevelInfo level, BatchWrite? batch = null);
BatchWrite StartBatch();
ChainLevelInfo? LoadLevel(long number);
void Flush();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ public void Setup()
Substitute.For<ISyncPeerPool>(),
_syncConfig,
new NullSyncReport(),
_blocksDb,
_metadataDb,
LimboLogs.Instance,
flushDbInterval: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,12 @@
using System.Threading.Tasks;
using Autofac.Features.AttributeFilters;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Blocks;
using Nethermind.Blockchain.Synchronization;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Core.Specs;
using Nethermind.Db;
using Nethermind.Logging;
using Nethermind.Serialization.Rlp;
using Nethermind.Stats.Model;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Peers;
Expand All @@ -40,7 +37,6 @@ public class BodiesSyncFeed : BarrierSyncFeed<BodiesSyncBatch?>
private readonly ISyncReport _syncReport;
private readonly ISyncPeerPool _syncPeerPool;
private readonly ISyncPointers _syncPointers;
private readonly IDb _blocksDb;

private SyncStatusList _syncStatusList;

Expand All @@ -56,7 +52,6 @@ public BodiesSyncFeed(
ISyncPeerPool syncPeerPool,
ISyncConfig syncConfig,
ISyncReport syncReport,
[KeyFilter(DbNames.Blocks)] IDb blocksDb,
[KeyFilter(DbNames.Metadata)] IDb metadataDb,
ILogManager logManager,
long flushDbInterval = DefaultFlushDbInterval)
Expand All @@ -67,7 +62,6 @@ public BodiesSyncFeed(
_syncPeerPool = syncPeerPool;
_syncConfig = syncConfig;
_syncReport = syncReport;
_blocksDb = blocksDb;
_flushDbInterval = flushDbInterval;

if (!_syncConfig.FastSync)
Expand Down Expand Up @@ -169,7 +163,9 @@ private void PostFinishCleanUp()
private void Flush()
{
long lowestInsertedAtPoint = _syncStatusList.LowestInsertWithoutGaps;
_blocksDb.Flush();

_blockTree.Flush(FlushReason.InsertBlocks);

_syncPointers.LowestInsertedBodyNumber = lowestInsertedAtPoint;
}

Expand Down Expand Up @@ -277,7 +273,7 @@ private int InsertBodies(BodiesSyncBatch batch)

private void InsertOneBlock(Block block)
{
_blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, bodiesWriteFlags: WriteFlags.DisableWAL);
_blockTree.Insert(block, BlockTreeInsertBlockOptions.SkipCanAcceptNewBlocks, blockWriteFlags: WriteFlags.DisableWAL);
_syncStatusList.MarkInserted(block.Number);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace Nethermind.Synchronization.FastBlocks
{
public class HeadersSyncFeed : ActivatedSyncFeed<HeadersSyncBatch?>
{

private readonly ILogger _logger;
private readonly ISyncPeerPool _syncPeerPool;
protected readonly ISyncReport _syncReport;
Expand Down Expand Up @@ -638,6 +637,8 @@ protected virtual int InsertHeaders(HeadersSyncBatch batch)

if (lowestInsertedHeader is not null && lowestInsertedHeader.Number < (LowestInsertedBlockHeader?.Number ?? long.MaxValue))
{
// Flush first, so that LowestInsertedHeader is preserved only after the headers are set
_blockTree.Flush(FlushReason.InsertHeaders);
LowestInsertedBlockHeader = lowestInsertedHeader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;

namespace Nethermind.Synchronization.ParallelSync
{
Expand Down
Loading