Skip to content

Commit a70bac5

Browse files
matthewkeilwemeetagaintwoethsgemini-code-assist[bot]nazarhussain
authored
feat: refactor block input (#8200)
**Motivation** Implement new IBlockInput on `unstable` version of peer das code. --------- Co-authored-by: Cayman <[email protected]> Co-authored-by: twoeths <[email protected]> Co-authored-by: Tuyen Nguyen <[email protected]> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Nazar Hussain <[email protected]> Co-authored-by: Nico Flaig <[email protected]>
1 parent 56313c7 commit a70bac5

File tree

97 files changed

+6675
-5235
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

97 files changed

+6675
-5235
lines changed

dashboards/lodestar_block_processor.json

Lines changed: 13 additions & 13 deletions
Large diffs are not rendered by default.

packages/beacon-node/src/api/impl/beacon/blocks/index.ts

Lines changed: 60 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,11 @@ import {
2828
sszTypesFor,
2929
} from "@lodestar/types";
3030
import {fromAsync, fromHex, sleep, toHex, toRootHex} from "@lodestar/utils";
31-
import {
32-
BlobsSource,
33-
BlockInput,
34-
BlockInputAvailableData,
35-
BlockInputBlobs,
36-
BlockInputDataColumns,
37-
BlockInputType,
38-
BlockSource,
39-
DataColumnsSource,
40-
ImportBlockOpts,
41-
getBlockInput,
42-
} from "../../../../chain/blocks/types.js";
31+
import {BlockInputSource, isBlockInputBlobs, isBlockInputColumns} from "../../../../chain/blocks/blockInput/index.js";
32+
import {ImportBlockOpts} from "../../../../chain/blocks/types.js";
4333
import {verifyBlocksInEpoch} from "../../../../chain/blocks/verifyBlock.js";
4434
import {BeaconChain} from "../../../../chain/chain.js";
35+
import {ChainEvent} from "../../../../chain/emitter.js";
4536
import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/errors/index.js";
4637
import {
4738
BlockType,
@@ -51,7 +42,6 @@ import {
5142
} from "../../../../chain/produceBlock/index.js";
5243
import {validateGossipBlock} from "../../../../chain/validation/block.js";
5344
import {OpSource} from "../../../../chain/validatorMonitor.js";
54-
import {NetworkEvent} from "../../../../network/index.js";
5545
import {getBlobSidecars, kzgCommitmentToVersionedHash, reconstructBlobs} from "../../../../util/blobs.js";
5646
import {getDataColumnSidecarsFromBlock} from "../../../../util/dataColumns.js";
5747
import {isOptimisticBlock} from "../../../../util/forkChoice.js";
@@ -95,10 +85,15 @@ export function getBeaconBlockApi({
9585
const fork = config.getForkName(slot);
9686
const blockRoot = toRootHex(chain.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(signedBlock.message));
9787

98-
let blockForImport: BlockInput, blobSidecars: deneb.BlobSidecars, dataColumnSidecars: fulu.DataColumnSidecars;
88+
const blockForImport = chain.seenBlockInputCache.getByBlock({
89+
block: signedBlock,
90+
source: BlockInputSource.api,
91+
seenTimestampSec,
92+
blockRootHex: blockRoot,
93+
});
94+
let blobSidecars: deneb.BlobSidecars, dataColumnSidecars: fulu.DataColumnSidecars;
9995

10096
if (isDenebBlockContents(signedBlockContents)) {
101-
let blockData: BlockInputAvailableData;
10297
if (isForkPostFulu(fork)) {
10398
const timer = metrics?.peerDas.dataColumnSidecarComputationTime.startTimer();
10499
// If the block was produced by this node, we will already have computed cells
@@ -116,30 +111,36 @@ export function getBeaconBlockApi({
116111
cellsAndProofs
117112
);
118113
timer?.();
119-
blockData = {
120-
fork,
121-
dataColumns: dataColumnSidecars,
122-
dataColumnsBytes: dataColumnSidecars.map(() => null),
123-
dataColumnsSource: DataColumnsSource.api,
124-
} as BlockInputDataColumns;
125114
blobSidecars = [];
126115
} else if (isForkPostDeneb(fork)) {
127116
blobSidecars = getBlobSidecars(config, signedBlock, signedBlockContents.blobs, signedBlockContents.kzgProofs);
128-
blockData = {
129-
fork,
130-
blobs: blobSidecars,
131-
blobsSource: BlobsSource.api,
132-
} as BlockInputBlobs;
133117
dataColumnSidecars = [];
134118
} else {
135119
throw Error(`Invalid data fork=${fork} for publish`);
136120
}
137-
138-
blockForImport = getBlockInput.availableData(config, signedBlock, BlockSource.api, blockData);
139121
} else {
140122
blobSidecars = [];
141123
dataColumnSidecars = [];
142-
blockForImport = getBlockInput.preData(config, signedBlock, BlockSource.api);
124+
}
125+
126+
if (isBlockInputColumns(blockForImport)) {
127+
for (const dataColumnSidecar of dataColumnSidecars) {
128+
blockForImport.addColumn({
129+
blockRootHex: blockRoot,
130+
columnSidecar: dataColumnSidecar,
131+
source: BlockInputSource.api,
132+
seenTimestampSec,
133+
});
134+
}
135+
} else if (isBlockInputBlobs(blockForImport)) {
136+
for (const blobSidecar of blobSidecars) {
137+
blockForImport.addBlob({
138+
blockRootHex: blockRoot,
139+
blobSidecar,
140+
source: BlockInputSource.api,
141+
seenTimestampSec,
142+
});
143+
}
143144
}
144145

145146
// check what validations have been requested before broadcasting and publishing the block
@@ -184,9 +185,10 @@ export function getBeaconBlockApi({
184185
if (!blockLocallyProduced) {
185186
const parentBlock = chain.forkChoice.getBlock(signedBlock.message.parentRoot);
186187
if (parentBlock === null) {
187-
network.events.emit(NetworkEvent.unknownBlockParent, {
188+
chain.emitter.emit(ChainEvent.unknownParent, {
188189
blockInput: blockForImport,
189190
peer: IDENTITY_PEER_ID,
191+
source: BlockInputSource.api,
190192
});
191193
chain.persistInvalidSszValue(
192194
chain.config.getForkTypes(slot).SignedBeaconBlock,
@@ -247,18 +249,16 @@ export function getBeaconBlockApi({
247249

248250
// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
249251
// REST request promise without any extra infrastructure.
250-
const msToBlockSlot =
251-
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
252+
const msToBlockSlot = computeTimeAtSlot(config, slot, chain.genesisTime) * 1000 - Date.now();
252253
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
253254
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
254255
await sleep(msToBlockSlot);
255256
}
256257

257258
// TODO: Validate block
258-
const delaySec =
259-
seenTimestampSec - (chain.genesisTime + blockForImport.block.message.slot * config.SECONDS_PER_SLOT);
259+
const delaySec = seenTimestampSec - (chain.genesisTime + slot * config.SECONDS_PER_SLOT);
260260
metrics?.gossipBlock.elapsedTimeTillReceived.observe({source: OpSource.api}, delaySec);
261-
chain.validatorMonitor?.registerBeaconBlock(OpSource.api, delaySec, blockForImport.block.message);
261+
chain.validatorMonitor?.registerBeaconBlock(OpSource.api, delaySec, signedBlock.message);
262262

263263
chain.logger.info("Publishing block", valLogMeta);
264264
const publishPromises = [
@@ -280,9 +280,10 @@ export function getBeaconBlockApi({
280280
.processBlock(blockForImport, {...opts, eagerPersistBlock: false})
281281
.catch((e) => {
282282
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
283-
network.events.emit(NetworkEvent.unknownBlockParent, {
283+
chain.emitter.emit(ChainEvent.unknownParent, {
284284
blockInput: blockForImport,
285285
peer: IDENTITY_PEER_ID,
286+
source: BlockInputSource.api,
286287
});
287288
}
288289
throw e;
@@ -315,38 +316,34 @@ export function getBeaconBlockApi({
315316

316317
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRoot});
317318

318-
if (blockForImport.type === BlockInputType.availableData) {
319-
if (isForkPostFulu(blockForImport.blockData.fork)) {
320-
const {dataColumns} = blockForImport.blockData as BlockInputDataColumns;
321-
metrics?.dataColumns.bySource.inc({source: DataColumnsSource.api}, dataColumns.length);
322-
323-
if (chain.emitter.listenerCount(routes.events.EventType.dataColumnSidecar)) {
324-
for (const dataColumnSidecar of dataColumns) {
325-
chain.emitter.emit(routes.events.EventType.dataColumnSidecar, {
326-
blockRoot,
327-
slot,
328-
index: dataColumnSidecar.index,
329-
kzgCommitments: dataColumnSidecar.kzgCommitments.map(toHex),
330-
});
331-
}
332-
}
333-
} else if (
334-
isForkPostDeneb(blockForImport.blockData.fork) &&
335-
chain.emitter.listenerCount(routes.events.EventType.blobSidecar)
336-
) {
337-
const {blobs} = blockForImport.blockData as BlockInputBlobs;
338-
339-
for (const blobSidecar of blobs) {
340-
const {index, kzgCommitment} = blobSidecar;
341-
chain.emitter.emit(routes.events.EventType.blobSidecar, {
319+
if (isBlockInputColumns(blockForImport)) {
320+
const dataColumns = blockForImport.getAllColumns();
321+
metrics?.dataColumns.bySource.inc({source: BlockInputSource.api}, dataColumns.length);
322+
323+
if (chain.emitter.listenerCount(routes.events.EventType.dataColumnSidecar)) {
324+
for (const dataColumnSidecar of dataColumns) {
325+
chain.emitter.emit(routes.events.EventType.dataColumnSidecar, {
342326
blockRoot,
343327
slot,
344-
index,
345-
kzgCommitment: toHex(kzgCommitment),
346-
versionedHash: toHex(kzgCommitmentToVersionedHash(kzgCommitment)),
328+
index: dataColumnSidecar.index,
329+
kzgCommitments: dataColumnSidecar.kzgCommitments.map(toHex),
347330
});
348331
}
349332
}
333+
} else if (isBlockInputBlobs(blockForImport) && chain.emitter.listenerCount(routes.events.EventType.blobSidecar)) {
334+
const blobSidecars = blockForImport.getBlobs();
335+
const versionedHashes = blockForImport.getVersionedHashes();
336+
337+
for (const blobSidecar of blobSidecars) {
338+
const {index, kzgCommitment} = blobSidecar;
339+
chain.emitter.emit(routes.events.EventType.blobSidecar, {
340+
blockRoot,
341+
slot,
342+
index,
343+
kzgCommitment: toHex(kzgCommitment),
344+
versionedHash: toHex(versionedHashes[index]),
345+
});
346+
}
350347
}
351348
};
352349

packages/beacon-node/src/api/impl/lodestar/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ export function getLodestarApi({
115115
data: (chain as BeaconChain)["blockProcessor"].jobQueue.getItems().map((item) => {
116116
const [blockInputs, opts] = item.args;
117117
return {
118-
blockSlots: blockInputs.map((blockInput) => blockInput.block.message.slot),
118+
blockSlots: blockInputs.map((blockInput) => blockInput.slot),
119119
jobOpts: opts,
120120
addedTimeMs: item.addedTimeMs,
121121
};

packages/beacon-node/src/api/impl/validator/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import {
5757
toRootHex,
5858
} from "@lodestar/utils";
5959
import {MAX_BUILDER_BOOST_FACTOR} from "@lodestar/validator";
60+
import {BlockInputSource} from "../../../chain/blocks/blockInput/types.js";
6061
import {
6162
AttestationError,
6263
AttestationErrorCode,
@@ -978,7 +979,7 @@ export function getValidatorApi(
978979
// see https://github.com/ChainSafe/lodestar/issues/5063
979980
if (!chain.forkChoice.hasBlock(beaconBlockRoot)) {
980981
const rootHex = toRootHex(beaconBlockRoot);
981-
network.searchUnknownSlotRoot({slot, root: rootHex});
982+
network.searchUnknownSlotRoot({slot, root: rootHex}, BlockInputSource.api);
982983
// if result of this call is false, i.e. block hasn't seen after 1 slot then the below notOnOptimisticBlockRoot call will throw error
983984
await chain.waitForBlock(slot, rootHex);
984985
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import {Logger, sleep} from "@lodestar/utils";
2+
import {ChainEventEmitter} from "./emitter.js";
3+
import {Metrics} from "../metrics/metrics.js";
4+
import {ChainForkConfig} from "@lodestar/config";
5+
import {BlockInputColumns} from "./blocks/blockInput/index.js";
6+
import {recoverDataColumnSidecars} from "../util/dataColumns.js";
7+
8+
/**
9+
* Minimum time to wait before attempting reconstruction
10+
*/
11+
const RECONSTRUCTION_DELAY_MIN_MS = 800;
12+
13+
/**
14+
* Maximum time to wait before attempting reconstruction
15+
*/
16+
const RECONSTRUCTION_DELAY_MAX_MS = 1200;
17+
18+
export type ColumnReconstructionTrackerInit = {
19+
logger: Logger;
20+
emitter: ChainEventEmitter;
21+
metrics: Metrics | null;
22+
config: ChainForkConfig;
23+
};
24+
25+
/**
26+
* Tracks column reconstruction attempts to avoid duplicate and multiple in-flight calls
27+
*/
28+
export class ColumnReconstructionTracker {
29+
logger: Logger;
30+
emitter: ChainEventEmitter;
31+
metrics: Metrics | null;
32+
config: ChainForkConfig;
33+
34+
/**
35+
* Track last attempted block root
36+
*
37+
* This is sufficient to avoid duplicate calls since we only call this
38+
* function when we see a new data column sidecar from gossip.
39+
*/
40+
lastBlockRootHex: string | null = null;
41+
/** Track if a reconstruction attempt is in-flight */
42+
running = false;
43+
44+
constructor(init: ColumnReconstructionTrackerInit) {
45+
this.logger = init.logger;
46+
this.emitter = init.emitter;
47+
this.metrics = init.metrics;
48+
this.config = init.config;
49+
}
50+
51+
triggerColumnReconstruction(blockInput: BlockInputColumns): void {
52+
if (this.running) {
53+
return;
54+
}
55+
56+
if (this.lastBlockRootHex === blockInput.blockRootHex) {
57+
return;
58+
}
59+
60+
// We don't care about the outcome of this call,
61+
// just that it has been triggered for this block root.
62+
this.running = true;
63+
this.lastBlockRootHex = blockInput.blockRootHex;
64+
const delay =
65+
RECONSTRUCTION_DELAY_MIN_MS + Math.random() * (RECONSTRUCTION_DELAY_MAX_MS - RECONSTRUCTION_DELAY_MIN_MS);
66+
sleep(delay).then(() => {
67+
recoverDataColumnSidecars(blockInput, this.emitter, this.metrics).finally(() => {
68+
this.running = false;
69+
});
70+
});
71+
}
72+
}

0 commit comments

Comments
 (0)