|
| 1 | +import {EventEmitter} from "node:events"; |
| 2 | +import {BeaconConfig} from "@lodestar/config"; |
| 3 | +import {SLOTS_PER_EPOCH} from "@lodestar/params"; |
| 4 | +import {BeaconStateAllForks, computeAnchorCheckpoint} from "@lodestar/state-transition"; |
| 5 | +import {Root, SignedBeaconBlock, Slot, phase0} from "@lodestar/types"; |
| 6 | +import {ErrorAborted, Logger, toRootHex} from "@lodestar/utils"; |
| 7 | +import {StrictEventEmitter} from "strict-event-emitter-types"; |
| 8 | +import {IBeaconChain} from "../../../chain/index.js"; |
| 9 | +import {IBeaconDb} from "../../../db/index.js"; |
| 10 | +import {Metrics} from "../../../metrics/metrics.js"; |
| 11 | +import {INetwork, NetworkEvent, NetworkEventData} from "../../../network/index.js"; |
| 12 | +import {ItTrigger} from "../../../util/itTrigger.js"; |
| 13 | +import {PeerIdStr} from "../../../util/peerId.js"; |
| 14 | +import {BackfillBlock, BackfillBlockHeader} from "../verify.js"; |
| 15 | + |
| 16 | +export type BackfillSyncModules = { |
| 17 | + chain: IBeaconChain; |
| 18 | + db: IBeaconDb; |
| 19 | + network: INetwork; |
| 20 | + config: BeaconConfig; |
| 21 | + logger: Logger; |
| 22 | + metrics: Metrics | null; |
| 23 | + anchorState: BeaconStateAllForks; |
| 24 | + wsCheckpoint?: phase0.Checkpoint; |
| 25 | + signal: AbortSignal; |
| 26 | +}; |
| 27 | + |
| 28 | +type BackfillModules = BackfillSyncModules & { |
| 29 | + syncAnchor: BackFillSyncAnchor; |
| 30 | + backfillStartFromSlot: Slot; |
| 31 | + wsCheckpointHeader: BackfillBlockHeader | null; |
| 32 | +}; |
| 33 | + |
| 34 | +export type BackfillSyncOpts = { |
| 35 | + backfillBatchSize: number; |
| 36 | +}; |
| 37 | + |
| 38 | +export enum BackfillSyncEvent { |
| 39 | + completed = "BackfillSync-completed", |
| 40 | +} |
| 41 | + |
| 42 | +export enum BackfillSyncMethod { |
| 43 | + rangesync = "rangesync", |
| 44 | + blockbyroot = "blockbyroot", |
| 45 | +} |
| 46 | + |
| 47 | +export enum BackfillSyncStatus { |
| 48 | + pending = "pending", |
| 49 | + syncing = "syncing", |
| 50 | + completed = "completed", |
| 51 | + aborted = "aborted", |
| 52 | +} |
| 53 | + |
| 54 | +type BackfillSyncEvents = { |
| 55 | + [BackfillSyncEvent.completed]: (oldestSlotSynced: Slot) => void; |
| 56 | +}; |
| 57 | + |
| 58 | +type BackfillSyncEmitter = StrictEventEmitter<EventEmitter, BackfillSyncEvents>; |
| 59 | + |
| 60 | +type BackFillSyncAnchor = |
| 61 | + | { |
| 62 | + anchorBlock: SignedBeaconBlock; |
| 63 | + anchorBlockRoot: Root; |
| 64 | + anchorSlot: Slot; |
| 65 | + lastBackSyncedBlock: BackfillBlock; |
| 66 | + } |
| 67 | + | {anchorBlock: null; anchorBlockRoot: Root; anchorSlot: null; lastBackSyncedBlock: BackfillBlock} |
| 68 | + | {anchorBlock: null; anchorBlockRoot: Root; anchorSlot: Slot; lastBackSyncedBlock: null}; |
| 69 | + |
| 70 | +export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}) { |
| 71 | + syncAnchor: BackFillSyncAnchor; |
| 72 | + |
| 73 | + private readonly chain: IBeaconChain; |
| 74 | + private readonly network: INetwork; |
| 75 | + private readonly db: IBeaconDb; |
| 76 | + private readonly config: BeaconConfig; |
| 77 | + private readonly logger: Logger; |
| 78 | + private readonly metrics: Metrics | null; |
| 79 | + private opts: BackfillSyncOpts; |
| 80 | + private wsCheckpointHeader: BackfillBlockHeader | null; |
| 81 | + private backfillStartFromSlot: Slot; |
| 82 | + |
| 83 | + private processor = new ItTrigger(); |
| 84 | + private peers = new Set<PeerIdStr>(); |
| 85 | + private status: BackfillSyncStatus = BackfillSyncStatus.pending; |
| 86 | + private signal: AbortSignal; |
| 87 | + |
| 88 | + constructor(opts: BackfillSyncOpts, modules: BackfillModules) { |
| 89 | + super(); |
| 90 | + |
| 91 | + this.syncAnchor = modules.syncAnchor; |
| 92 | + this.backfillStartFromSlot = modules.backfillStartFromSlot; |
| 93 | + this.wsCheckpointHeader = modules.wsCheckpointHeader; |
| 94 | + |
| 95 | + this.chain = modules.chain; |
| 96 | + this.network = modules.network; |
| 97 | + this.db = modules.db; |
| 98 | + this.config = modules.config; |
| 99 | + this.logger = modules.logger; |
| 100 | + this.metrics = modules.metrics; |
| 101 | + |
| 102 | + this.opts = opts; |
| 103 | + this.network.events.on(NetworkEvent.peerConnected, this.addPeer); |
| 104 | + this.network.events.on(NetworkEvent.peerDisconnected, this.removePeer); |
| 105 | + this.signal = modules.signal; |
| 106 | + |
| 107 | + this.sync() |
| 108 | + .then(() => { |
| 109 | + this.logger.info("BackfillSync completed"); |
| 110 | + this.close(); |
| 111 | + }) |
| 112 | + .catch((e) => { |
| 113 | + this.logger.error("BackfillSync processor error", e); |
| 114 | + this.status = BackfillSyncStatus.aborted; |
| 115 | + this.close(); |
| 116 | + }); |
| 117 | + } |
| 118 | + |
| 119 | + static async init<T extends BackfillSync = BackfillSync>( |
| 120 | + opts: BackfillSyncOpts, |
| 121 | + modules: BackfillSyncModules |
| 122 | + ): Promise<T> { |
| 123 | + const {config, anchorState, wsCheckpoint, logger} = modules; |
| 124 | + |
| 125 | + const {checkpoint: anchorCp} = computeAnchorCheckpoint(config, anchorState); |
| 126 | + const anchorSlot = anchorState.latestBlockHeader.slot; |
| 127 | + const syncAnchor = { |
| 128 | + anchorBlock: null, |
| 129 | + anchorBlockRoot: anchorCp.root, |
| 130 | + anchorSlot, |
| 131 | + lastBackSyncedBlock: null, |
| 132 | + }; |
| 133 | + |
| 134 | + const backfillStartFromSlot = anchorSlot; |
| 135 | + logger.info("Initializing from Checkpoint", { |
| 136 | + root: toRootHex(anchorCp.root), |
| 137 | + epoch: anchorCp.epoch, |
| 138 | + backfillStartFromSlot, |
| 139 | + }); |
| 140 | + |
| 141 | + const wsCheckpointHeader: BackfillBlockHeader | null = wsCheckpoint |
| 142 | + ? {root: wsCheckpoint.root, slot: wsCheckpoint.epoch * SLOTS_PER_EPOCH} |
| 143 | + : null; |
| 144 | + |
| 145 | + return new BackfillSync(opts, { |
| 146 | + syncAnchor, |
| 147 | + backfillStartFromSlot, // syncAnchor.anchorSlot |
| 148 | + wsCheckpointHeader, // from checkpoint sync |
| 149 | + ...modules, |
| 150 | + }) as T; |
| 151 | + } |
| 152 | + |
| 153 | + private async sync(): Promise<void> {} |
| 154 | + |
| 155 | + close(): void { |
| 156 | + this.network.events.off(NetworkEvent.peerConnected, this.addPeer); |
| 157 | + this.network.events.off(NetworkEvent.peerDisconnected, this.removePeer); |
| 158 | + this.processor.end(new ErrorAborted("BackfillSync")); |
| 159 | + } |
| 160 | + |
| 161 | + private addPeer = (data: NetworkEventData[NetworkEvent.peerConnected]): void => { |
| 162 | + const requiredSlot = this.syncAnchor.lastBackSyncedBlock?.slot ?? this.backfillStartFromSlot; |
| 163 | + this.logger.debug("Add peer", {peerhead: data.status.headSlot, requiredSlot}); |
| 164 | + if (data.status.headSlot >= requiredSlot) { |
| 165 | + this.peers.add(data.peer); |
| 166 | + this.processor.trigger(); |
| 167 | + } |
| 168 | + }; |
| 169 | + |
| 170 | + private removePeer = (data: NetworkEventData[NetworkEvent.peerDisconnected]): void => { |
| 171 | + this.peers.delete(data.peer); |
| 172 | + }; |
| 173 | +} |
0 commit comments