Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform an aggregation duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of an epoch for the current and lookahead (next) epochs for all validators
* that have attester duties in the current and lookahead epochs.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated beacon committee selection proofs
Expand All @@ -521,6 +525,9 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform a sync committee contribution (sync aggregation) duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of each slot for all validators that are included in the current sync committee.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated sync committee selection proofs
Expand Down
103 changes: 3 additions & 100 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostElectra} from "@lodestar/params";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {BLSSignature, SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {prettyBytes, sleep, toRootHex} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
Expand Down Expand Up @@ -75,18 +75,6 @@ export class AttestationService {
}
const fork = this.config.getForkName(slot);

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other attestation tasks but must be finished before starting
// attestation aggregation as it is required to correctly determine if validator is aggregator
// and to produce a AggregateAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, duties, slot, signal).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {slot}, e)
);
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) ATTESTATION_DUE_BPS of the slot has transpired -- whichever comes first.
Expand Down Expand Up @@ -274,89 +262,4 @@ export class AttestationService {
}
}
}

/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
* 4. Resubscribe validators as aggregators on beacon committee subnets
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the google doc is no longer accessible

*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: AttDutyAndProof[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);

this.logger.debug("Submitting partial beacon committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections}),
// Exit attestation aggregation flow if there is no response after ATTESTATION_DUE_BPS of the slot as
// beacon node would likely not have enough time to prepare an aggregate attestation.
// Note that the aggregations flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_aggregator` in AttestationDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getAttestationDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);

if (!res) {
throw new Error("Failed to receive combined selection proofs before ATTESTATION_DUE_BPS of the slot");
}

const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {slot, count: combinedSelections.length});

const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];

for (const dutyAndProof of duties) {
const {validatorIndex, committeeIndex, committeeLength, committeesAtSlot} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);

if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);

if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;

// Only push subnet subscriptions with `isAggregator=true` as all validators
// with duties for slot are already subscribed to subnets with `isAggregator=false`.
beaconCommitteeSubscriptions.push({
validatorIndex,
committeesAtSlot,
committeeIndex,
slot,
isAggregator,
});
this.logger.debug("Resubscribing validator as aggregator on beacon committee subnet", logCtxValidator);
}
}

// If there are any subscriptions with aggregators, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
(await this.api.validator.prepareBeaconCommitteeSubnet({subscriptions: beaconCommitteeSubscriptions})).assertOk();
this.logger.debug("Resubscribed validators as aggregators on beacon committee subnets", {
slot,
count: beaconCommitteeSubscriptions.length,
});
}
}
}
64 changes: 62 additions & 2 deletions packages/validator/src/services/attestationDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ export class AttestationDutiesService {
for (const epoch of [currentEpoch, nextEpoch]) {
const epochDuties = this.dutiesByIndexByEpoch.get(epoch)?.dutiesByIndex;
if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This is required to correctly determine if validator is aggregator and to produce
// a AggregateAndProof that can be threshold aggregated by the middleware client.
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), epoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {epoch}, e)
);
}

for (const {duty, selectionProof} of epochDuties.values()) {
if (indexSet.has(duty.validatorIndex)) {
beaconCommitteeSubscriptions.push({
Expand Down Expand Up @@ -367,6 +378,12 @@ export class AttestationDutiesService {
const epochDuties = this.dutiesByIndexByEpoch.get(dutyEpoch)?.dutiesByIndex;

if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), dutyEpoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection after duties reorg", logContext, e)
);
}

for (const {duty, selectionProof} of epochDuties.values()) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
Expand Down Expand Up @@ -403,8 +420,8 @@ export class AttestationDutiesService {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_aggregator` would produce incorrect result.
// AttestationService will exchange partial for combined selection proofs retrieved from
// distributed validator middleware client and determine aggregators at beginning of every slot.
// Before subscribing to beacon committee subnets, aggregators are determined by exchanging
// partial for combined selection proofs retrieved from distributed validator middleware client.
return {duty, selectionProof: null, partialSelectionProof: selectionProof};
}

Expand All @@ -427,4 +444,47 @@ export class AttestationDutiesService {
}
}
}

/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
*/
private async runDistributedAggregationSelectionTasks(duties: AttDutyAndProof[], epoch: Epoch): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot: duty.slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);

this.logger.debug("Submitting partial beacon committee selection proofs", {epoch, count: partialSelections.length});

const res = await this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call this with duties for a whole epoch, could consider batching the request but generally assume vc and charon run on the same host without any proxy in-between that could cause issues due to large request body size


const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {epoch, count: combinedSelections.length});

for (const dutyAndProof of duties) {
const {slot, validatorIndex, committeeIndex, committeeLength} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);

if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);

if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;
}
}
}
}
95 changes: 2 additions & 93 deletions packages/validator/src/services/syncCommittee.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostAltair} from "@lodestar/params";
import {isSyncCommitteeAggregator} from "@lodestar/state-transition";
import {BLSSignature, CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
Expand Down Expand Up @@ -73,18 +72,6 @@ export class SyncCommitteeService {
return;
}

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitSyncCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other sync committee tasks but must be finished before starting
// sync committee contributions as it is required to correctly determine if validator is aggregator
// and to produce a ContributionAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, dutiesAtSlot, slot, signal).catch((e) =>
this.logger.error("Error on sync committee aggregation selection", {slot}, e)
);
}

// unlike Attestation, SyncCommitteeSignature could be published asap
// especially with lodestar, it's very busy at ATTESTATION_DUE_BPS of the slot
// see https://github.com/ChainSafe/lodestar/issues/4608
Expand Down Expand Up @@ -257,82 +244,4 @@ export class SyncCommitteeService {
}
}
}

/**
* Performs additional sync committee contribution tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should produce sync committee contribution
* 3. Mutate duty objects to set selection proofs for aggregators
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: SyncDutyAndProofs[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.SyncCommitteeSelection[] = [];

for (const {duty, selectionProofs} of duties) {
const validatorSelections: routes.validator.SyncCommitteeSelection[] = selectionProofs.map(
({subcommitteeIndex, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex,
selectionProof: partialSelectionProof as BLSSignature,
})
);
partialSelections.push(...validatorSelections);
}

this.logger.debug("Submitting partial sync committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitSyncCommitteeSelections({selections: partialSelections}),
// Exit sync committee contributions flow if there is no response after CONTRIBUTION_DUE_BPS of the slot.
// This is in contrast to attestations aggregations flow which is already exited at ATTESTATION_DUE_BPS of the slot
// because for sync committee is not required to resubscribe to subnets as beacon node will assume
// validator always aggregates. This allows us to wait until we have to produce sync committee contributions.
// Note that the sync committee contributions flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_sync_committee_aggregator` in SyncCommitteeDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getSyncContributionDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);

if (!res) {
throw new Error("Failed to receive combined selection proofs before CONTRIBUTION_DUE_BPS of the slot");
}

const combinedSelections = res.value();
this.logger.debug("Received combined sync committee selection proofs", {slot, count: combinedSelections.length});

for (const dutyAndProofs of duties) {
const {validatorIndex, subnets} = dutyAndProofs.duty;

for (const subnet of subnets) {
const logCtxValidator = {slot, index: subnet, validatorIndex};

const combinedSelection = combinedSelections.find(
(s) => s.validatorIndex === validatorIndex && s.slot === slot && s.subcommitteeIndex === subnet
);

if (!combinedSelection) {
this.logger.warn("Did not receive combined sync committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isSyncCommitteeAggregator(combinedSelection.selectionProof);

if (isAggregator) {
const selectionProofObject = dutyAndProofs.selectionProofs.find((p) => p.subcommitteeIndex === subnet);
if (selectionProofObject) {
// Update selection proof by mutating proof objects in duty object
selectionProofObject.selectionProof = combinedSelection.selectionProof;
}
}
}
}
}
}
Loading
Loading