Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,18 @@ export function getBeaconPoolApi({
},

async submitPoolVoluntaryExit({signedVoluntaryExit}) {
// Validate the voluntary exit
await validateApiVoluntaryExit(chain, signedVoluntaryExit);
// Insert into the operation pool
// The opPool will handle validation and delayed broadcasting
chain.opPool.insertVoluntaryExit(signedVoluntaryExit);
// Emit event immediately so monitoring/logging can track it
chain.emitter.emit(routes.events.EventType.voluntaryExit, signedVoluntaryExit);
await network.publishVoluntaryExit(signedVoluntaryExit);
// Note: Network publishing is now handled by the opPool when conditions are met
// instead of publishing immediately here
Comment on lines +202 to +203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see those changes implemented in the PR

logger.info("Voluntary exit accepted and added to pool", {
validatorIndex: signedVoluntaryExit.message.validatorIndex,
});
},

async submitPoolBLSToExecutionChange({blsToExecutionChanges}) {
Expand Down
78 changes: 74 additions & 4 deletions packages/beacon-node/src/chain/opPools/opPool.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Id, Repository} from "@lodestar/db";
import {Logger} from "@lodestar/logger";
import {
BLS_WITHDRAWAL_PREFIX,
ForkName,
Expand Down Expand Up @@ -28,10 +29,14 @@ import {
import {fromHex, toHex, toRootHex} from "@lodestar/utils";
import {IBeaconDb} from "../../db/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {INetwork} from "../../network/interface.js";
import {SignedBLSToExecutionChangeVersioned} from "../../util/types.js";
import {IBeaconChain} from "../index.js";
import {BlockType} from "../interface.js";
import {BlockProductionStep} from "../produceBlock/produceBlockBody.js";
import {validateGossipVoluntaryExit} from "../validation/voluntaryExit.js";
import {isValidBlsToExecutionChangeForBlockInclusion} from "./utils.js";
import {VoluntaryExitDelayedBroadcaster} from "./voluntaryExitBroadcaster.js";

type HexRoot = string;
type AttesterSlashingCached = {
Expand All @@ -50,6 +55,20 @@ export class OpPool {
private readonly attesterSlashingIndexes = new Set<ValidatorIndex>();
/** Map of validator index -> SignedBLSToExecutionChange */
private readonly blsToExecutionChanges = new Map<ValidatorIndex, SignedBLSToExecutionChangeVersioned>();
/** Handles delayed broadcasting of voluntary exits */
private readonly voluntaryExitBroadcaster: VoluntaryExitDelayedBroadcaster;

constructor(
private readonly chain: IBeaconChain,
private readonly network: INetwork,
private readonly logger: Logger
) {
// Initialize the voluntary exit broadcaster
this.voluntaryExitBroadcaster = new VoluntaryExitDelayedBroadcaster(this.chain, this.network, this.logger);

// Start periodic check for cached voluntary exits
this.startPeriodicBroadcastCheck();
}

// Getters for metrics

Expand Down Expand Up @@ -81,7 +100,7 @@ export class OpPool {
this.insertProposerSlashing(proposerSlashing);
}
for (const voluntaryExit of voluntaryExits) {
this.insertVoluntaryExit(voluntaryExit);
await this.insertVoluntaryExit(voluntaryExit);
}
for (const item of blsToExecutionChanges) {
this.insertBlsToExecutionChange(item.data, item.preCapella);
Expand Down Expand Up @@ -162,9 +181,37 @@ export class OpPool {
this.proposerSlashings.set(proposerSlashing.signedHeader1.message.proposerIndex, proposerSlashing);
}

/** Must be validated beforehand */
insertVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): void {
this.voluntaryExits.set(voluntaryExit.message.validatorIndex, voluntaryExit);
/**
* Must be validated beforehand.
* MODIFIED: Now async and tries to broadcast immediately if conditions met,
* otherwise caches for delayed broadcast.
*/
async insertVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<void> {
const validatorIndex = voluntaryExit.message.validatorIndex;

// Don't add if already seen
if (this.hasSeenVoluntaryExit(validatorIndex)) {
return;
}

try {
// Try full validation (including transient conditions)
await validateGossipVoluntaryExit(this.chain, voluntaryExit);

// If validation passes, add to main pool (ready to include in blocks and broadcast)
this.voluntaryExits.set(validatorIndex, voluntaryExit);

this.logger.debug("Voluntary exit added to pool (conditions met)", {validatorIndex});
} catch (e) {
// If validation fails due to transient conditions, cache for delayed broadcast
// The signature was already validated in validateApiVoluntaryExit
this.voluntaryExitBroadcaster.addToCacheForDelayedBroadcast(voluntaryExit);

this.logger.debug("Voluntary exit cached for delayed broadcast", {
validatorIndex,
error: (e as Error).message,
});
}
}

/** Must be validated beforehand */
Expand All @@ -175,6 +222,29 @@ export class OpPool {
});
}

/**
* Start periodic check of cached voluntary exits.
* Checks every slot (12 seconds) for exits ready to broadcast.
*/
private startPeriodicBroadcastCheck(): void {
const checkIntervalMs = 12_000; // Every slot (12 seconds)

setInterval(async () => {
try {
await this.voluntaryExitBroadcaster.checkAndBroadcastCachedExits();
} catch (e) {
this.logger.error("Error in periodic voluntary exit broadcast check", {error: (e as Error).message});
}
}, checkIntervalMs);
}

/**
* Get cache size for metrics/monitoring
*/
getVoluntaryExitCacheSize(): number {
return this.voluntaryExitBroadcaster.getCacheSize();
}

/**
* Get proposer and attester slashings and voluntary exits and bls to execution change for inclusion in a block.
*
Expand Down
211 changes: 211 additions & 0 deletions packages/beacon-node/src/chain/opPools/voluntaryExitBroadcaster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
import {Logger} from "@lodestar/logger";
import {phase0} from "@lodestar/types";
import {INetwork} from "../../network/interface.js";
import {VoluntaryExitError} from "../errors/index.js";
import {IBeaconChain} from "../index.js";
import {validateGossipVoluntaryExit} from "../validation/voluntaryExit.js";

/**
* Cached voluntary exit with metadata
*/
interface CachedVoluntaryExit {
voluntaryExit: phase0.SignedVoluntaryExit;
receivedAt: number; // timestamp when received via API
}

/**
* Manages delayed broadcasting of voluntary exits.
*
* When a voluntary exit is submitted via API but doesn't yet meet transient conditions
* (e.g., validator not active, exit epoch not reached, pending withdrawals), it's cached
* here and periodically checked. Once conditions are met, it's broadcast to the network.
*
* This improves UX by accepting exits early and is more forgiving for DVT/multi-node setups.
*/
export class VoluntaryExitDelayedBroadcaster {
private readonly cachedExits = new Map<number, CachedVoluntaryExit>(); // validatorIndex -> exit
private readonly MAX_CACHE_AGE_MS = 7 * 24 * 60 * 60 * 1000; // 7 days

constructor(
private readonly chain: IBeaconChain,
private readonly network: INetwork,
private readonly logger: Logger
) {}

/**
* Add a voluntary exit to the cache for delayed broadcasting.
* Called when a voluntary exit passes signature validation but doesn't yet meet
* transient conditions (validator active status, exit epoch timing, etc.)
*/
addToCacheForDelayedBroadcast(voluntaryExit: phase0.SignedVoluntaryExit): void {
const validatorIndex = voluntaryExit.message.validatorIndex;

// Don't cache if already exists
if (this.cachedExits.has(validatorIndex)) {
this.logger.debug("Voluntary exit already cached, skipping", {validatorIndex});
return;
}

this.cachedExits.set(validatorIndex, {
voluntaryExit,
receivedAt: Date.now(),
});

this.logger.info("Voluntary exit cached for delayed broadcasting", {
validatorIndex,
epoch: voluntaryExit.message.epoch,
cacheSize: this.cachedExits.size,
});
}

/**
* Check cached voluntary exits and broadcast those that now meet transient conditions.
* Should be called periodically (e.g., every slot or every few seconds).
*/
async checkAndBroadcastCachedExits(): Promise<void> {
if (this.cachedExits.size === 0) {
return;
}

const currentTime = Date.now();
const exitsToRemove: number[] = [];

for (const [validatorIndex, cached] of this.cachedExits.entries()) {
try {
// Check if exit has been in cache too long
const ageMs = currentTime - cached.receivedAt;
if (ageMs > this.MAX_CACHE_AGE_MS) {
this.logger.warn("Removing stale voluntary exit from cache", {
validatorIndex,
ageMs,
ageDays: Math.floor(ageMs / (24 * 60 * 60 * 1000)),
});
exitsToRemove.push(validatorIndex);
continue;
}

// Use full gossip validation to check if all conditions (including transient) are now met
await validateGossipVoluntaryExit(this.chain, cached.voluntaryExit);

// If validation passes, broadcast to network
await this.network.publishVoluntaryExit(cached.voluntaryExit);

this.logger.info("Successfully broadcasted delayed voluntary exit", {
validatorIndex,
epoch: cached.voluntaryExit.message.epoch,
delayMs: ageMs,
delaySeconds: Math.floor(ageMs / 1000),
});

// Remove from cache after successful broadcast
exitsToRemove.push(validatorIndex);
} catch (e) {
if (e instanceof VoluntaryExitError) {
// Check if this is a permanent failure or transient
if (this.isPermanentFailure(e)) {
this.logger.warn("Removing voluntary exit due to permanent validation failure", {
validatorIndex,
error: e.message,
errorCode: e.type.code,
});
exitsToRemove.push(validatorIndex);
} else {
// Transient conditions not yet met, keep in cache
this.logger.debug("Voluntary exit not yet ready for broadcasting", {
validatorIndex,
error: e.message,
cacheSize: this.cachedExits.size,
});
}
} else {
// Unexpected error, log and remove from cache
this.logger.error("Unexpected error checking voluntary exit, removing from cache", {
validatorIndex,
error: (e as Error).message,
});
exitsToRemove.push(validatorIndex);
}
}
}

// Clean up processed exits
for (const validatorIndex of exitsToRemove) {
this.cachedExits.delete(validatorIndex);
}

if (exitsToRemove.length > 0) {
this.logger.debug("Cleaned up voluntary exit cache", {
removed: exitsToRemove.length,
remaining: this.cachedExits.size,
});
}
}

/**
* Determine if a validation error is permanent (will never become valid)
* or transient (may become valid later).
*
* Transient errors: validator not active yet, exit epoch not reached, pending withdrawals
* Permanent errors: invalid signature, validator already exited, invalid index
*/
private isPermanentFailure(error: VoluntaryExitError): boolean {
const errorMessage = error.message.toLowerCase();

// These are transient conditions that may resolve over time
const transientIndicators = [
"not active",
"not_active_validator",
"validator_not_active",
"not withdrawable",
"withdrawable_epoch",
"exit epoch",
"epoch not current",
"pending withdrawal", // post-Electra
"pending_withdrawal",
"too early",
"future epoch",
];

// If any transient indicator is found, it's not a permanent failure
const isTransient = transientIndicators.some((indicator) => errorMessage.includes(indicator));

return !isTransient;
}

/**
* Get the current size of the cache (for metrics/monitoring)
*/
getCacheSize(): number {
return this.cachedExits.size;
}

/**
* Get all cached voluntary exits (for debugging/inspection)
*/
getCachedExits(): phase0.SignedVoluntaryExit[] {
return Array.from(this.cachedExits.values()).map((cached) => cached.voluntaryExit);
}

/**
* Clear all cached exits (for testing or shutdown)
*/
clearCache(): void {
const size = this.cachedExits.size;
this.cachedExits.clear();
if (size > 0) {
this.logger.info("Cleared voluntary exit cache", {clearedCount: size});
}
}

/**
* Remove a specific exit from cache (for testing or manual intervention)
*/
removeFromCache(validatorIndex: number): boolean {
const existed = this.cachedExits.has(validatorIndex);
this.cachedExits.delete(validatorIndex);
if (existed) {
this.logger.debug("Manually removed voluntary exit from cache", {validatorIndex});
}
return existed;
}
}
Loading