Skip to content

Commit be45584

Browse files
committed
feat: Allow recovery of passed LZ collector events
1 parent 98ef71b commit be45584

File tree

3 files changed

+159
-20
lines changed

3 files changed

+159
-20
lines changed

src/collector/layer-zero/layer-zero.utils.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,5 @@ export function decodeHeader(encodedHeader: string): LayerZeroHeader {
5252

5353
export function calculatePayloadHash(guid: string, message: string): string {
5454
const payload = `${guid}${message.slice(2)}`; // 'slice(2)' used to remove the '0x' from the 'message'
55-
return keccak256(payload);
55+
return keccak256(payload).toLowerCase();
5656
}

src/collector/layer-zero/layer-zero.worker.ts

+146-15
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,21 @@ import { LayerZeroEnpointV2Interface, PacketSentEvent } from 'src/contracts/Laye
4242
import { STATUS_LOG_INTERVAL } from 'src/logger/logger.service';
4343
import { calculatePayloadHash, decodeHeader, decodePacket } from './layer-zero.utils';
4444

45+
const ON_PACKET_SENT_PROCESSED_CHANNEL = 'packet_sent_processed';
46+
const ON_PACKET_SENT_PROCESSED_DELAY = 30 * 1000;
47+
const MAX_PENDING_PAYLOAD_VERIFIED_EVENT_DURATION = 6 * 60 * 60 * 1000;
4548

4649
interface LayerZeroPayloadData {
4750
messageIdentifier: string,
4851
payload: string,
4952
}
5053

54+
interface PayloadVerifiedEvent {
55+
timestamp: number,
56+
payloadHash: string,
57+
log: Log,
58+
}
59+
5160
class LayerZeroWorker {
5261
private readonly config: LayerZeroWorkerData;
5362

@@ -71,6 +80,11 @@ class LayerZeroWorker {
7180
private readonly layerZeroChainIdMap: Record<string, string>;
7281
private readonly incentivesAddresses: Record<string, string>;
7382

83+
// Keep track of unprocessed PayloadVerified events caused by not having processed yet the
84+
// corresponding PacketSent event on the source chain. This is most relevant for when
85+
// recovering past relays.
86+
private readonly pendingPayloadVerifiedEvents: PayloadVerifiedEvent[] = [];
87+
7488
private currentStatus: MonitorStatus | null = null;
7589
private monitor: MonitorInterface;
7690

@@ -201,6 +215,8 @@ class LayerZeroWorker {
201215
`LayerZero collector worker started.`,
202216
);
203217

218+
await this.listenForProcessedPackets();
219+
204220
this.fromBlock = await this.getStartingBlock();
205221
const stopBlock = this.config.stoppingBlock ?? Infinity;
206222

@@ -330,6 +346,65 @@ class LayerZeroWorker {
330346
return logs;
331347
}
332348

349+
private async listenForProcessedPackets(): Promise<void> {
350+
// Listen for whenever a packet is registered.
351+
await this.store.on(
352+
this.getOnPacketSentChannel(),
353+
(payloadHash: string) => {
354+
// Add a delay to prevent this handler from being executed at the exact same time
355+
// as the `handlePayloadVerifiedEvent()` handler, which can cause this handler to
356+
// search for a pending PayloadVerified event before it's registered.
357+
setTimeout(
358+
() => void this.onProcessedPacketHandler(payloadHash),
359+
ON_PACKET_SENT_PROCESSED_DELAY
360+
);
361+
}
362+
)
363+
}
364+
365+
private async onProcessedPacketHandler(
366+
payloadHash: string
367+
): Promise<void> {
368+
369+
this.logger.debug(
370+
{ payloadHash },
371+
`On PacketSent event recovery handler triggered.`
372+
);
373+
374+
const pendingPayloadVerifiedEventIndex = this.pendingPayloadVerifiedEvents.findIndex(
375+
(event) => event.payloadHash === payloadHash,
376+
);
377+
378+
if (pendingPayloadVerifiedEventIndex == -1) {
379+
return;
380+
}
381+
382+
this.logger.info(
383+
{ payloadHash },
384+
`Recovering PayloadVerified event.`
385+
);
386+
387+
const [pendingEvent] = this.pendingPayloadVerifiedEvents.splice(pendingPayloadVerifiedEventIndex, 1);
388+
389+
const parsedLog = this.receiveULN302Interface.parseLog(pendingEvent!.log);
390+
391+
try {
392+
await this.handlePayloadVerifiedEvent(
393+
pendingEvent!.log,
394+
parsedLog! // The log has been previously parsed, `parsedLog` should never be null.
395+
);
396+
}
397+
catch (error) {
398+
this.logger.error(
399+
{
400+
payloadHash,
401+
error: tryErrorToString(error),
402+
},
403+
`Error on PayloadVerified event recovery.`
404+
);
405+
}
406+
}
407+
333408

334409

335410
// Event handlers
@@ -463,8 +538,8 @@ class LayerZeroWorker {
463538
messageIdentifier,
464539

465540
amb: 'layer-zero',
466-
fromChainId: fromChainId.toString(),
467-
toChainId: toChainId.toString(),
541+
fromChainId,
542+
toChainId,
468543
fromIncentivesAddress: packet.sender,
469544
toIncentivesAddress,
470545

@@ -484,17 +559,31 @@ class LayerZeroWorker {
484559

485560
await this.store.setAdditionalAMBData<LayerZeroPayloadData>(
486561
'layer-zero',
487-
payloadHash.toLowerCase(),
562+
payloadHash,
488563
{
489564
messageIdentifier,
490565
payload: encodedPayload
491566
},
492567
);
568+
569+
// Broadcast that the PacketSent event has been processed to recover any pending logic
570+
// resulting from PayloadVerified events.
571+
await this.store.postMessage(
572+
this.getOnPacketSentChannel(),
573+
payloadHash
574+
);
493575
}
494576

495577
/**
496578
* Handles PayloadVerified events.
497579
*
580+
* ! A PayloadVerified event is emitted every time a specific packet is verified, but a single
581+
* ! event may not be enough to indicate that the packet is valid. Thus, there may be multiple
582+
* ! events for a single packet, which, depending at the time at which they are processed, can
583+
* ! result in this function submitting the proof for the same packet multiple times. This
584+
* ! undesired side effect is mitigated by the Store's `setAMBProof()` function, which will not
585+
* ! register proofs for the same packet more than once.
586+
*
498587
* @param log - The log data.
499588
* @param parsedLog - The parsed log description.
500589
*/
@@ -544,24 +633,22 @@ class LayerZeroWorker {
544633
return;
545634
}
546635

547-
this.logger.info(
548-
{
549-
transactionHash: log.transactionHash,
550-
payloadHash,
551-
},
552-
'PayloadVerified event decoded.',
553-
);
554-
555636
// Recover the encoded payload data from storage (saved on an earlier PacketSent event).
556637
const payloadData = await this.store.getAdditionalAMBData<LayerZeroPayloadData>(
557638
'layer-zero',
558639
payloadHash.toLowerCase()
559640
);
560641
if (!payloadData) {
561-
this.logger.warn(
642+
this.logger.info(
562643
{ payloadHash },
563-
'No payload data found for the given payloadHash.',
644+
'No payload data found for the given payloadHash. Queueing for recovery for when the payload is available.',
564645
);
646+
647+
this.queuePendingPayloadVerifiedEvent(
648+
payloadHash,
649+
log,
650+
);
651+
565652
return;
566653
}
567654

@@ -581,8 +668,8 @@ class LayerZeroWorker {
581668
messageIdentifier: payloadData.messageIdentifier,
582669

583670
amb: 'layer-zero',
584-
fromChainId: fromChainId.toString(),
585-
toChainId: toChainId.toString(),
671+
fromChainId,
672+
toChainId,
586673

587674
message: payloadData.payload,
588675
messageCtx: '0x',
@@ -610,6 +697,47 @@ class LayerZeroWorker {
610697
);
611698
}
612699
}
700+
701+
private queuePendingPayloadVerifiedEvent(
702+
payloadHash: string,
703+
log: Log,
704+
): void {
705+
// Prune any old pending events (note that events are stored in chronological order).
706+
const pruneTimestamp = Date.now() - MAX_PENDING_PAYLOAD_VERIFIED_EVENT_DURATION;
707+
708+
let firstNonStaleIndex;
709+
for (
710+
firstNonStaleIndex = 0;
711+
firstNonStaleIndex < this.pendingPayloadVerifiedEvents.length;
712+
firstNonStaleIndex++
713+
) {
714+
if (this.pendingPayloadVerifiedEvents[firstNonStaleIndex]!.timestamp > pruneTimestamp) {
715+
break;
716+
}
717+
}
718+
719+
if (firstNonStaleIndex != 0) {
720+
this.pendingPayloadVerifiedEvents.splice(0, firstNonStaleIndex);
721+
}
722+
723+
724+
// Register the pending event if not already pending, otherwise update the pending's event
725+
// 'timestamp'.
726+
const alreadyPendingEvent = this.pendingPayloadVerifiedEvents.find((event) => {
727+
event.payloadHash === payloadHash
728+
});
729+
730+
if (alreadyPendingEvent != undefined) {
731+
alreadyPendingEvent.timestamp = Date.now();
732+
}
733+
else {
734+
this.pendingPayloadVerifiedEvents.push({
735+
timestamp: Date.now(),
736+
payloadHash,
737+
log,
738+
});
739+
}
740+
}
613741

614742

615743
async checkIfVerifiable(
@@ -684,6 +812,9 @@ class LayerZeroWorker {
684812
throw new Error(`Failed to query the ULN configuration. (dvn: ${dvn}, destination eid: ${dstEid}).`);
685813
}
686814

815+
private getOnPacketSentChannel(): string {
816+
return Store.getChannel('layer-zero', ON_PACKET_SENT_PROCESSED_CHANNEL);
817+
}
687818

688819

689820
// Misc Helpers

src/store/store.lib.ts

+12-4
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,9 @@ export class Store {
172172
);
173173
}
174174

175-
async on(
175+
async on<T>(
176176
channel: string,
177-
callback: (payload: Record<string, any>) => void,
177+
callback: (payload: T) => void,
178178
) {
179179
await this.redisSubscriptions.subscribe(channel);
180180

@@ -185,9 +185,9 @@ export class Store {
185185
});
186186
}
187187

188-
async onPattern(
188+
async onPattern<T>(
189189
pattern: string,
190-
callback: (payload: Record<string, any>) => void,
190+
callback: (payload: T) => void,
191191
) {
192192
await this.redisSubscriptions.psubscribe(pattern);
193193

@@ -582,6 +582,14 @@ export class Store {
582582
chainId,
583583
ambProof.messageIdentifier
584584
);
585+
586+
const currentProof = await this.get(key);
587+
if (currentProof != undefined) {
588+
//TODO log
589+
// Do not allow proofs to be set multiple times (prevent submitting the same relay more than once).
590+
return;
591+
}
592+
585593
await this.set(key, JSON.stringify(ambProof));
586594

587595
const channel = Store.getOnAMBProofChannel(

0 commit comments

Comments
 (0)