Skip to content

Commit 7cb8d1d

Browse files
committed
wip: add peer management
1 parent 0baa7fa commit 7cb8d1d

File tree

1 file changed

+302
-9
lines changed
  • packages/beacon-node/src/sync/backfill/v2

1 file changed

+302
-9
lines changed

packages/beacon-node/src/sync/backfill/v2/index.ts

Lines changed: 302 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
import {EventEmitter} from "node:events";
2+
import {StrictEventEmitter} from "strict-event-emitter-types";
23
import {BeaconConfig} from "@lodestar/config";
34
import {SLOTS_PER_EPOCH} from "@lodestar/params";
45
import {BeaconStateAllForks, computeAnchorCheckpoint} from "@lodestar/state-transition";
5-
import {Root, SignedBeaconBlock, Slot, phase0} from "@lodestar/types";
6-
import {ErrorAborted, Logger, toRootHex} from "@lodestar/utils";
7-
import {StrictEventEmitter} from "strict-event-emitter-types";
6+
import {Root, SignedBeaconBlock, Slot, WithBytes, fulu, phase0} from "@lodestar/types";
7+
import {ErrorAborted, Logger, prettyPrintIndices, toRootHex} from "@lodestar/utils";
88
import {IBeaconChain} from "../../../chain/index.js";
99
import {IBeaconDb} from "../../../db/index.js";
1010
import {Metrics} from "../../../metrics/metrics.js";
11-
import {INetwork, NetworkEvent, NetworkEventData} from "../../../network/index.js";
11+
import {INetwork, NetworkEvent, NetworkEventData, PeerAction} from "../../../network/index.js";
12+
import {PeerSyncMeta} from "../../../network/peers/peersData.js";
1213
import {ItTrigger} from "../../../util/itTrigger.js";
1314
import {PeerIdStr} from "../../../util/peerId.js";
14-
import {BackfillBlock, BackfillBlockHeader} from "../verify.js";
15+
import {BackfillSyncError, BackfillSyncErrorCode} from "../errors.ts";
16+
import {BackfillBlock, BackfillBlockHeader, verifyBlockSequence} from "../verify.js";
1517

1618
export type BackfillSyncModules = {
1719
chain: IBeaconChain;
@@ -67,6 +69,16 @@ type BackFillSyncAnchor =
6769
| {anchorBlock: null; anchorBlockRoot: Root; anchorSlot: null; lastBackSyncedBlock: BackfillBlock}
6870
| {anchorBlock: null; anchorBlockRoot: Root; anchorSlot: Slot; lastBackSyncedBlock: null};
6971

72+
// Updating peer score:
73+
// We can update it on certain events, such as request fulfilled, batch successfully imported, response times.
74+
type PeerBackfillSyncMeta = PeerSyncMeta & {
75+
score: number;
76+
// For round-robin distribution
77+
lastSlotRequested: number;
78+
failedRequests: number;
79+
avgResTime: number;
80+
};
81+
7082
export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}) {
7183
syncAnchor: BackFillSyncAnchor;
7284

@@ -81,7 +93,18 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
8193
private backfillStartFromSlot: Slot;
8294

8395
private processor = new ItTrigger();
96+
97+
// TODO: Consider implementing more efficient data structures and explore using util fns from network.ts (getConnectedPeerSyncMeta, getConnectedPeers, etc.)
98+
// - Adding selectivity to peers: we already have earliestAvailableSlot via PeerSyncMeta,
99+
// For delegating batch requests to different peers, we have following considerations:
100+
// - distribute requests evnely to avoid overwhelming a peer (use round-robin, etc.)
101+
// - keep track of valid responses, upscore peer
102+
// - keep track of failed responses, downscore peer, disconnect over threshold
103+
// - grouping by earliestAvailableSlot value, a peer irrelevant now can be relevant in later stage of backfill
104+
// - explore if any other pruning reqd
84105
private peers = new Set<PeerIdStr>();
106+
private peersMeta: Map<PeerIdStr, PeerBackfillSyncMeta>;
107+
85108
private status: BackfillSyncStatus = BackfillSyncStatus.pending;
86109
private signal: AbortSignal;
87110

@@ -98,6 +121,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
98121
this.config = modules.config;
99122
this.logger = modules.logger;
100123
this.metrics = modules.metrics;
124+
this.peersMeta = new Map();
101125

102126
this.opts = opts;
103127
this.network.events.on(NetworkEvent.peerConnected, this.addPeer);
@@ -150,7 +174,110 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
150174
}) as T;
151175
}
152176

153-
private async sync(): Promise<void> {}
177+
private async sync(): Promise<void> {
178+
this.processor.trigger();
179+
180+
this.logger.info("Starting BackfillSync.");
181+
let iterationCount = 0;
182+
183+
for await (const _ of this.processor) {
184+
this.status = BackfillSyncStatus.syncing;
185+
// Mark: A
186+
iterationCount++;
187+
188+
// DEBUG_CODE
189+
this.logger.info("Trying to do backfill sync", {
190+
iteration: iterationCount,
191+
totalPeers: this.peers.size,
192+
peersInMeta: this.peersMeta.size,
193+
currentRequiredSlot: this.syncAnchor.lastBackSyncedBlock?.slot ?? this.backfillStartFromSlot,
194+
signal: this.signal.aborted ? "aborted" : "active",
195+
});
196+
if (this.peers.size === 0) {
197+
this.logger.warn("No peers connected, waiting for peers...", {
198+
iteration: iterationCount,
199+
});
200+
// await new Promise((resolve) => setTimeout(resolve, 5000));
201+
continue;
202+
}
203+
// DEBUG_CODE
204+
205+
try {
206+
// Select best peer
207+
const goodPeer: PeerIdStr = this.getGoodSyncPeer();
208+
209+
if (!goodPeer) {
210+
this.logger.info("No eligible peer found for backfill", {
211+
iteration: iterationCount,
212+
totalPeers: this.peers.size,
213+
peersInMeta: this.peersMeta.size,
214+
currentRequiredSlot: this.syncAnchor.lastBackSyncedBlock?.slot ?? this.backfillStartFromSlot,
215+
});
216+
// DEBUG_CODE
217+
for (const [peerId, meta] of this.peersMeta.entries()) {
218+
this.logger.debug("Peer status", {
219+
peer: peerId,
220+
client: meta.client,
221+
connected: this.peers.has(peerId),
222+
score: meta.score,
223+
failedRequests: meta.failedRequests,
224+
lastSlotRequested: meta.lastSlotRequested,
225+
earliestAvailableSlot: meta.earliestAvailableSlot,
226+
avgResTime: meta.avgResTime,
227+
isConnected: this.peers.has(peerId),
228+
});
229+
}
230+
// DEBUG_CODE
231+
continue;
232+
}
233+
234+
// biome-ignore lint/style/noNonNullAssertion: test
235+
const goodPeerMetaData: PeerBackfillSyncMeta = this.peersMeta.get(goodPeer)!;
236+
if (!goodPeerMetaData) {
237+
this.logger.error("Selected peer has no metadata (should not happen)", {
238+
peer: goodPeer,
239+
});
240+
throw Error("Selected peer has no metadata (should not happen)");
241+
}
242+
this.logger.info("Got a good peer to sync", {
243+
iteration: iterationCount,
244+
totalPeers: this.peers.size,
245+
peer: goodPeer,
246+
client: goodPeerMetaData?.client,
247+
earliestAvailableSlot: goodPeerMetaData?.earliestAvailableSlot,
248+
score: goodPeerMetaData?.score,
249+
lastSlotRequested: goodPeerMetaData?.lastSlotRequested,
250+
failedRequests: goodPeerMetaData?.failedRequests,
251+
avgResTime: goodPeerMetaData?.avgResTime,
252+
custodyColumns: prettyPrintIndices(goodPeerMetaData?.custodyColumns),
253+
});
254+
255+
// send beacon_blocks_by_range request
256+
// validate blocks
257+
// store blocks in db blockarchive
258+
// update lastBackSyncedBlock
259+
// update BackfillRange and BackfillState
260+
// update earliestAvailableSlot
261+
} catch (error) {
262+
this.logger.error("Caught Error: ", {
263+
error: (error as Error).message,
264+
errorStack: (error as Error).stack,
265+
});
266+
} finally {
267+
// if (this.status !== BackfillSyncStatus.aborted) this.processor.trigger(); ?
268+
// sleep for sometime
269+
await new Promise((resolve) => setTimeout(resolve, 5000));
270+
}
271+
}
272+
273+
// DEBUG_CODE
274+
this.logger.info("BackfillSync loop ended", {
275+
status: this.status,
276+
// finalSlot: this.backfillStartFromSlot, ?
277+
});
278+
// DEBUG_CODE
279+
// throw new ErrorAborted("BackfillSync");
280+
}
154281

155282
close(): void {
156283
this.network.events.off(NetworkEvent.peerConnected, this.addPeer);
@@ -159,15 +286,181 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
159286
}
160287

161288
private addPeer = (data: NetworkEventData[NetworkEvent.peerConnected]): void => {
289+
// TODO: use db singleton object: BackfillRange to get requiredSlot
162290
const requiredSlot = this.syncAnchor.lastBackSyncedBlock?.slot ?? this.backfillStartFromSlot;
163-
this.logger.debug("Add peer", {peerhead: data.status.headSlot, requiredSlot});
164-
if (data.status.headSlot >= requiredSlot) {
291+
292+
// DEBUG_CODE
293+
// this.logger.info("Add peer bf:", {ourpeerhead: data.status.headSlot, requiredSlot});
294+
// DEBUG_CODE
295+
296+
const peerMetaData = this.network.getConnectedPeerSyncMeta(data.peer);
297+
const earliestAvailableSlot = (data.status as fulu.Status).earliestAvailableSlot;
298+
299+
// Reconsider logic for earliestAvailableSlot value, a peer irrelevant now can be relevant in later stage of backfill.
300+
// Assuming short lived connections for now, and hence ignoring above comment.
301+
if (data.status.headSlot < requiredSlot) {
302+
// DEBUG_CODE
303+
this.logger.warn("Peer head too far behind", {
304+
// we cant trust this peer
305+
peer: data.peer,
306+
peerHead: data.status.headSlot,
307+
requiredSlot,
308+
});
309+
// DEBUG_CODE
310+
return;
311+
}
312+
// ignore irrelevant peers
313+
if (peerMetaData.earliestAvailableSlot !== undefined && peerMetaData.earliestAvailableSlot > requiredSlot) {
314+
// DEBUG_CODE
315+
this.logger.warn("Peer doesn't have required historical data", {
316+
peer: data.peer,
317+
earliestAvailableSlot,
318+
requiredSlot,
319+
});
320+
// DEBUG_CODE
321+
return;
322+
}
323+
324+
// Add peer
325+
if (!this.peers.has(data.peer)) {
165326
this.peers.add(data.peer);
166-
this.processor.trigger();
327+
this.peersMeta.set(data.peer, {
328+
...peerMetaData,
329+
score: 0,
330+
lastSlotRequested: 0, // 0 default value
331+
failedRequests: 0,
332+
avgResTime: 0,
333+
});
334+
// DEBUG_CODE
335+
// this.logger.info("Backfill peer added", {
336+
// peer: data.peer,
337+
// client: peerMetaData?.client,
338+
// totalPeers: this.peers.size,
339+
// earliestAvailableSlot,
340+
// });
341+
// DEBUG_CODE
342+
} else {
343+
const existingMetaData = this.peersMeta.get(data.peer);
344+
if (existingMetaData) {
345+
// update metadata if already present
346+
this.peersMeta.set(data.peer, {
347+
...existingMetaData,
348+
...peerMetaData,
349+
});
350+
// DEBUG_CODE
351+
// this.logger.info("Backfill peer re-statused", {
352+
// peer: data.peer,
353+
// client: peerMetaData?.client,
354+
// totalPeers: this.peers.size,
355+
// earliestAvailableSlot,
356+
// score: existingMetaData.score,
357+
// lastSlotRequested: existingMetaData.lastSlotRequested,
358+
// failedRequests: existingMetaData.failedRequests,
359+
// avgResTime: existingMetaData.avgResTime,
360+
// custodyColumns: prettyPrintIndices(goodPeerMetaData?.custodyColumns),
361+
// });
362+
// DEBUG_CODE
363+
}
167364
}
365+
this.processor.trigger();
168366
};
169367

170368
private removePeer = (data: NetworkEventData[NetworkEvent.peerDisconnected]): void => {
369+
// DEBUG_CODE
370+
// const meta = this.peersMeta.get(data.peer);
371+
// this.logger.info("Backfill peer disconnected", {
372+
// peer: data.peer,
373+
// client: meta?.client,
374+
// score: meta?.score,
375+
// failedRequests: meta?.failedRequests,
376+
// lastSlotRequested: meta?.lastSlotRequested,
377+
// });
378+
// DEBUG_CODE
171379
this.peers.delete(data.peer);
380+
// need to remove metadata to maintain less selectivity and fair chance, or else cumulative downscoring/upscoring may lead to very high selectivity
381+
this.peersMeta.delete(data.peer);
382+
};
383+
384+
// TODO: fix this inefficient impl in future
385+
// return weighted random peer
386+
private getGoodSyncPeer = (): PeerIdStr => {
387+
const eligiblePeers: PeerIdStr[] = [];
388+
// TODO: use db singleton object: BackfillRange to get requiredSlot
389+
const currRequiredSlot = this.syncAnchor.lastBackSyncedBlock?.slot ?? this.backfillStartFromSlot;
390+
391+
// DEBUG_CODE
392+
// this.logger.info("Selecting peer for backfill", {
393+
// currRequiredSlot,
394+
// totalPeersConnected: this.peers.size,
395+
// totalPeersInMeta: this.peersMeta.size,
396+
// });
397+
// DEBUG_CODE
398+
399+
for (const [peerId, meta] of this.peersMeta.entries()) {
400+
// if metadata present but currently not connected
401+
if (!this.peers.has(peerId)) {
402+
continue;
403+
}
404+
if (meta.failedRequests >= 3) {
405+
// DEBUG_CODE
406+
// this.logger.warn("Skipping peer with too many failures", {
407+
// peerId,
408+
// client: meta.client,
409+
// failedRequests: meta.failedRequests,
410+
// });
411+
// DEBUG_CODE
412+
continue;
413+
}
414+
if (meta.earliestAvailableSlot !== undefined && meta.earliestAvailableSlot > currRequiredSlot) {
415+
// DEBUG_CODE
416+
// this.logger.warn("Skipping peer without reqd data", {
417+
// peerId,
418+
// earliestAvailableSlot: meta.earliestAvailableSlot,
419+
// currRequiredSlot,
420+
// });
421+
// DEBUG_CODE
422+
continue;
423+
}
424+
// if lastSlotRequest is very recent
425+
if (
426+
meta.lastSlotRequested !== 0 &&
427+
Math.abs(meta.lastSlotRequested - currRequiredSlot) < this.opts.backfillBatchSize
428+
) {
429+
// DEBUG_CODE
430+
// this.logger.info("Skipping recently used peer", {
431+
// peerId,
432+
// lastSlotRequested: meta.lastSlotRequested,
433+
// currRequiredSlot,
434+
// });
435+
// DEBUG_CODE
436+
continue;
437+
}
438+
eligiblePeers.push(peerId);
439+
}
440+
441+
if (eligiblePeers.length === 0) {
442+
this.logger.warn("No eligible peers for backfill", {
443+
totalPeers: this.peers.size,
444+
currRequiredSlot,
445+
});
446+
// throw to catch in sync loop
447+
// throw Error("No eligible peers for backfill");
448+
return "";
449+
}
450+
451+
eligiblePeers.sort((a, b) => {
452+
const metaA = this.peersMeta.get(a);
453+
const metaB = this.peersMeta.get(b);
454+
if (!metaA || !metaB) {
455+
return 0;
456+
}
457+
// sort peers according to score
458+
if (Math.abs(metaA.score - metaB.score) > 10) {
459+
return metaB.score - metaA.score;
460+
}
461+
// if difference within 10 points, choose last recently used
462+
return metaB.lastSlotRequested - metaA.lastSlotRequested;
463+
});
464+
return eligiblePeers[0];
172465
};
173466
}

0 commit comments

Comments
 (0)