Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/api/src/beacon/routes/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform an aggregation duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of an epoch for the current and lookahead (next) epochs for all validators
* that have attester duties in the current and lookahead epochs.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated beacon committee selection proofs
Expand All @@ -521,6 +525,9 @@ export type Endpoints = {
* a validator client to correctly determine if one of its validators has been selected to
* perform a sync committee contribution (sync aggregation) duty in this slot.
*
* Validator clients running in a distributed validator cluster must query this endpoint
* at the start of each slot for all validators that are included in the current sync committee.
*
* Note that this endpoint is not implemented by the beacon node and will return a 501 error
*
* Returns an array of threshold aggregated sync committee selection proofs
Expand Down
103 changes: 3 additions & 100 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostElectra} from "@lodestar/params";
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@lodestar/state-transition";
import {BLSSignature, SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {SignedAggregateAndProof, SingleAttestation, Slot, phase0, ssz} from "@lodestar/types";
import {prettyBytes, sleep, toRootHex} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
Expand Down Expand Up @@ -75,18 +75,6 @@ export class AttestationService {
}
const fork = this.config.getForkName(slot);

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other attestation tasks but must be finished before starting
// attestation aggregation as it is required to correctly determine if validator is aggregator
// and to produce a AggregateAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, duties, slot, signal).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {slot}, e)
);
}

// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) ATTESTATION_DUE_BPS of the slot has transpired -- whichever comes first.
Expand Down Expand Up @@ -274,89 +262,4 @@ export class AttestationService {
}
}
}

/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
* 4. Resubscribe validators as aggregators on beacon committee subnets
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the google doc is no longer accessible

*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: AttDutyAndProof[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);

this.logger.debug("Submitting partial beacon committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections}),
// Exit attestation aggregation flow if there is no response after ATTESTATION_DUE_BPS of the slot as
// beacon node would likely not have enough time to prepare an aggregate attestation.
// Note that the aggregations flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_aggregator` in AttestationDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getAttestationDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);

if (!res) {
throw new Error("Failed to receive combined selection proofs before ATTESTATION_DUE_BPS of the slot");
}

const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {slot, count: combinedSelections.length});

const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];

for (const dutyAndProof of duties) {
const {validatorIndex, committeeIndex, committeeLength, committeesAtSlot} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);

if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);

if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;

// Only push subnet subscriptions with `isAggregator=true` as all validators
// with duties for slot are already subscribed to subnets with `isAggregator=false`.
beaconCommitteeSubscriptions.push({
validatorIndex,
committeesAtSlot,
committeeIndex,
slot,
isAggregator,
});
this.logger.debug("Resubscribing validator as aggregator on beacon committee subnet", logCtxValidator);
}
}

// If there are any subscriptions with aggregators, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
(await this.api.validator.prepareBeaconCommitteeSubnet({subscriptions: beaconCommitteeSubscriptions})).assertOk();
this.logger.debug("Resubscribed validators as aggregators on beacon committee subnets", {
slot,
count: beaconCommitteeSubscriptions.length,
});
}
}
}
64 changes: 62 additions & 2 deletions packages/validator/src/services/attestationDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,17 @@ export class AttestationDutiesService {
for (const epoch of [currentEpoch, nextEpoch]) {
const epochDuties = this.dutiesByIndexByEpoch.get(epoch)?.dutiesByIndex;
if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitBeaconCommitteeSelections on the distributed validator middleware client.
// This is required to correctly determine if validator is aggregator and to produce
// a AggregateAndProof that can be threshold aggregated by the middleware client.
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), epoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection", {epoch}, e)
);
}

for (const {duty, selectionProof} of epochDuties.values()) {
if (indexSet.has(duty.validatorIndex)) {
beaconCommitteeSubscriptions.push({
Expand Down Expand Up @@ -367,6 +378,12 @@ export class AttestationDutiesService {
const epochDuties = this.dutiesByIndexByEpoch.get(dutyEpoch)?.dutiesByIndex;

if (epochDuties) {
if (this.opts?.distributedAggregationSelection) {
await this.runDistributedAggregationSelectionTasks(Array.from(epochDuties.values()), dutyEpoch).catch((e) =>
this.logger.error("Error on attestation aggregation selection after duties reorg", logContext, e)
);
}

for (const {duty, selectionProof} of epochDuties.values()) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
Expand Down Expand Up @@ -403,8 +420,8 @@ export class AttestationDutiesService {
if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// Passing a partial selection proof to `is_aggregator` would produce incorrect result.
// AttestationService will exchange partial for combined selection proofs retrieved from
// distributed validator middleware client and determine aggregators at beginning of every slot.
// Before subscribing to beacon committee subnets, aggregators are determined by exchanging
// partial for combined selection proofs retrieved from distributed validator middleware client.
return {duty, selectionProof: null, partialSelectionProof: selectionProof};
}

Expand All @@ -427,4 +444,47 @@ export class AttestationDutiesService {
}
}
}

/**
* Performs additional attestation aggregation tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should aggregate attestations
* 3. Mutate duty objects to set selection proofs for aggregators
*/
private async runDistributedAggregationSelectionTasks(duties: AttDutyAndProof[], epoch: Epoch): Promise<void> {
const partialSelections: routes.validator.BeaconCommitteeSelection[] = duties.map(
({duty, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot: duty.slot,
selectionProof: partialSelectionProof as BLSSignature,
})
);

this.logger.debug("Submitting partial beacon committee selection proofs", {epoch, count: partialSelections.length});

const res = await this.api.validator.submitBeaconCommitteeSelections({selections: partialSelections});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call this with duties for a whole epoch, could consider batching the request but generally assume vc and charon run on the same host without any proxy in-between that could cause issues due to large request body size


const combinedSelections = res.value();
this.logger.debug("Received combined beacon committee selection proofs", {epoch, count: combinedSelections.length});

for (const dutyAndProof of duties) {
const {slot, validatorIndex, committeeIndex, committeeLength} = dutyAndProof.duty;
const logCtxValidator = {slot, index: committeeIndex, validatorIndex};

const combinedSelection = combinedSelections.find((s) => s.validatorIndex === validatorIndex && s.slot === slot);

if (!combinedSelection) {
this.logger.warn("Did not receive combined beacon committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isAggregatorFromCommitteeLength(committeeLength, combinedSelection.selectionProof);

if (isAggregator) {
// Update selection proof by mutating duty object
dutyAndProof.selectionProof = combinedSelection.selectionProof;
}
}
}
}
95 changes: 2 additions & 93 deletions packages/validator/src/services/syncCommittee.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {ApiClient, routes} from "@lodestar/api";
import {ApiClient} from "@lodestar/api";
import {ChainForkConfig} from "@lodestar/config";
import {ForkName, isForkPostAltair} from "@lodestar/params";
import {isSyncCommitteeAggregator} from "@lodestar/state-transition";
import {BLSSignature, CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {CommitteeIndex, Root, Slot, altair} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {Metrics} from "../metrics.js";
import {PubkeyHex} from "../types.js";
Expand Down Expand Up @@ -73,18 +72,6 @@ export class SyncCommitteeService {
return;
}

if (this.opts?.distributedAggregationSelection) {
// Validator in distributed cluster only has a key share, not the full private key.
// The partial selection proofs must be exchanged for combined selection proofs by
// calling submitSyncCommitteeSelections on the distributed validator middleware client.
// This will run in parallel to other sync committee tasks but must be finished before starting
// sync committee contributions as it is required to correctly determine if validator is aggregator
// and to produce a ContributionAndProof that can be threshold aggregated by the middleware client.
this.runDistributedAggregationSelectionTasks(fork, dutiesAtSlot, slot, signal).catch((e) =>
this.logger.error("Error on sync committee aggregation selection", {slot}, e)
);
}

// unlike Attestation, SyncCommitteeSignature could be published asap
// especially with lodestar, it's very busy at ATTESTATION_DUE_BPS of the slot
// see https://github.com/ChainSafe/lodestar/issues/4608
Expand Down Expand Up @@ -257,82 +244,4 @@ export class SyncCommitteeService {
}
}
}

/**
* Performs additional sync committee contribution tasks required if validator is part of distributed cluster
*
* 1. Exchange partial for combined selection proofs
* 2. Determine validators that should produce sync committee contribution
* 3. Mutate duty objects to set selection proofs for aggregators
*
* See https://docs.google.com/document/d/1q9jOTPcYQa-3L8luRvQJ-M0eegtba4Nmon3dpO79TMk/mobilebasic
*/
private async runDistributedAggregationSelectionTasks(
fork: ForkName,
duties: SyncDutyAndProofs[],
slot: number,
signal: AbortSignal
): Promise<void> {
const partialSelections: routes.validator.SyncCommitteeSelection[] = [];

for (const {duty, selectionProofs} of duties) {
const validatorSelections: routes.validator.SyncCommitteeSelection[] = selectionProofs.map(
({subcommitteeIndex, partialSelectionProof}) => ({
validatorIndex: duty.validatorIndex,
slot,
subcommitteeIndex,
selectionProof: partialSelectionProof as BLSSignature,
})
);
partialSelections.push(...validatorSelections);
}

this.logger.debug("Submitting partial sync committee selection proofs", {slot, count: partialSelections.length});

const res = await Promise.race([
this.api.validator.submitSyncCommitteeSelections({selections: partialSelections}),
// Exit sync committee contributions flow if there is no response after CONTRIBUTION_DUE_BPS of the slot.
// This is in contrast to attestations aggregations flow which is already exited at ATTESTATION_DUE_BPS of the slot
// because for sync committee is not required to resubscribe to subnets as beacon node will assume
// validator always aggregates. This allows us to wait until we have to produce sync committee contributions.
// Note that the sync committee contributions flow is not explicitly exited but rather will be skipped
// due to the fact that calculation of `is_sync_committee_aggregator` in SyncCommitteeDutiesService is not done
// and selectionProof is set to null, meaning no validator will be considered an aggregator.
sleep(this.config.getSyncContributionDueMs(fork) - this.clock.msFromSlot(slot), signal),
]);

if (!res) {
throw new Error("Failed to receive combined selection proofs before CONTRIBUTION_DUE_BPS of the slot");
}

const combinedSelections = res.value();
this.logger.debug("Received combined sync committee selection proofs", {slot, count: combinedSelections.length});

for (const dutyAndProofs of duties) {
const {validatorIndex, subnets} = dutyAndProofs.duty;

for (const subnet of subnets) {
const logCtxValidator = {slot, index: subnet, validatorIndex};

const combinedSelection = combinedSelections.find(
(s) => s.validatorIndex === validatorIndex && s.slot === slot && s.subcommitteeIndex === subnet
);

if (!combinedSelection) {
this.logger.warn("Did not receive combined sync committee selection proof", logCtxValidator);
continue;
}

const isAggregator = isSyncCommitteeAggregator(combinedSelection.selectionProof);

if (isAggregator) {
const selectionProofObject = dutyAndProofs.selectionProofs.find((p) => p.subcommitteeIndex === subnet);
if (selectionProofObject) {
// Update selection proof by mutating proof objects in duty object
selectionProofObject.selectionProof = combinedSelection.selectionProof;
}
}
}
}
}
}
Loading
Loading