Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,953 changes: 954 additions & 999 deletions poetry.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ package-mode = false
python = ">=3.12,<3.13"
python-decouple = "==3.8"
sentry-sdk = "==1.45.1"
py-ecc = "==6.0.0"
py-ecc = "==8.0.0"
gql = {extras = ["aiohttp"], version = "==3.5.0"}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sw-utils version is being downgraded from v0.11.0 to v0.10.6. This is a backwards change that could break functionality or lose features. If this is intentional due to compatibility issues with Web3.py v7, it should be documented in the PR description or comments.

Suggested change
gql = {extras = ["aiohttp"], version = "==3.5.0"}
gql = {extras = ["aiohttp"], version = "==3.5.0"}
# sw-utils is intentionally downgraded to v0.10.6 for compatibility with Web3.py v7. See PR description for details.

Copilot uses AI. Check for mistakes.
multiproof = { git = "https://github.com/stakewise/multiproof.git", rev = "v0.1.8" }
sw-utils = {git = "https://github.com/stakewise/sw-utils.git", rev = "v0.9.18"}
sw-utils = {git = "https://github.com/stakewise/sw-utils.git", rev = "v0.10.4"}
staking-deposit = { git = "https://github.com/ethereum/staking-deposit-cli.git", rev = "v2.8.0" }
pycryptodomex = "3.19.1"
click = "==8.2.1"
Expand All @@ -22,7 +21,7 @@ prometheus-client = "==0.17.1"
psycopg2 = "==2.9.9"
pyyaml = "==6.0.1"
python-json-logger = "==2.0.7"
aiohttp = "==3.12.14"
aiohttp = "==3.13.0"

[tool.poetry.group.dev.dependencies]
pylint = "==3.3.3"
Expand Down
110 changes: 55 additions & 55 deletions src/commands/start/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import src
from src.common.checks import wait_execution_catch_up_consensus
from src.common.clients import setup_clients
from src.common.clients import close_clients, setup_clients
from src.common.consensus import get_chain_finalized_head
from src.common.execution import WalletTask, update_oracles_cache
from src.common.logging import setup_logging
Expand Down Expand Up @@ -38,68 +38,68 @@ async def start_base() -> None:
setup_logging()
setup_sentry()
await setup_clients()
try:
log_start()

log_start()
if not settings.skip_startup_checks:
await startup_checks()

if not settings.skip_startup_checks:
await startup_checks()
if settings.enable_metrics:
await metrics_server()

if settings.enable_metrics:
await metrics_server()
NetworkValidatorCrud().setup()
VaultValidatorCrud().setup()
CheckpointCrud().setup()

NetworkValidatorCrud().setup()
VaultValidatorCrud().setup()
CheckpointCrud().setup()
# load network validators from ipfs dump
await load_genesis_validators()

# load network validators from ipfs dump
await load_genesis_validators()
keystore: BaseKeystore | None = None
relayer: RelayerClient | None = None

keystore: BaseKeystore | None = None
relayer: RelayerClient | None = None
if settings.validators_registration_mode == ValidatorsRegistrationMode.AUTO:
if settings.disable_validators_registration:
keystore = None
else:
keystore = await load_keystore()

if settings.validators_registration_mode == ValidatorsRegistrationMode.AUTO:
if settings.disable_validators_registration:
keystore = None
else:
keystore = await load_keystore()

else:
relayer = RelayerClient()

# start operator tasks
chain_state = await get_chain_finalized_head()
await wait_execution_catch_up_consensus(chain_state)

CheckpointCrud().save_checkpoints()
logger.info('Syncing validator events...')
await scan_validators_events(chain_state.block_number, is_startup=True)

logger.info('Updating oracles cache...')
await update_oracles_cache()

if settings.validators_registration_mode == ValidatorsRegistrationMode.API:
logger.info('Starting api mode')

logger.info('Started operator service')
metrics.service_started.labels(network=settings.network).set(1)
with InterruptHandler() as interrupt_handler:
tasks = [
ValidatorTask(
keystore=keystore,
relayer=relayer,
).run(interrupt_handler),
ExitSignatureTask(
keystore=keystore,
).run(interrupt_handler),
MetricsTask().run(interrupt_handler),
WalletTask().run(interrupt_handler),
]
if settings.harvest_vault:
tasks.append(HarvestTask().run(interrupt_handler))
if settings.claim_fee_splitter:
tasks.append(SplitRewardTask().run(interrupt_handler))

await asyncio.gather(*tasks)
relayer = RelayerClient()

# start operator tasks
chain_state = await get_chain_finalized_head()
await wait_execution_catch_up_consensus(chain_state)
CheckpointCrud().save_checkpoints()
logger.info('Syncing validator events...')
await scan_validators_events(chain_state.block_number, is_startup=True)
logger.info('Updating oracles cache...')
await update_oracles_cache()

if settings.validators_registration_mode == ValidatorsRegistrationMode.API:
logger.info('Starting api mode')

logger.info('Started operator service')
metrics.service_started.labels(network=settings.network).set(1)
with InterruptHandler() as interrupt_handler:
tasks = [
ValidatorTask(
keystore=keystore,
relayer=relayer,
).run(interrupt_handler),
ExitSignatureTask(
keystore=keystore,
).run(interrupt_handler),
MetricsTask().run(interrupt_handler),
WalletTask().run(interrupt_handler),
]
if settings.harvest_vault:
tasks.append(HarvestTask().run(interrupt_handler))
if settings.claim_fee_splitter:
tasks.append(SplitRewardTask().run(interrupt_handler))

await asyncio.gather(*tasks)
finally:
await close_clients()


class ValidatorTask(BaseTask):
Expand Down
14 changes: 11 additions & 3 deletions src/common/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
)
from sw_utils.graph.client import GraphClient as SWGraphClient
from web3 import AsyncWeb3
from web3.middleware.signing import async_construct_sign_and_send_raw_middleware
from web3.middleware import SignAndSendRawMiddlewareBuilder

import src
from src.common.wallet import wallet
Expand Down Expand Up @@ -56,8 +56,10 @@ async def setup(self) -> None:
# Account is required when emitting transactions.
# For read-only queries account may be omitted.
if wallet.can_load():
w3.middleware_onion.add(
await async_construct_sign_and_send_raw_middleware(wallet.account)
w3.middleware_onion.inject(
# pylint: disable-next=no-value-for-parameter
SignAndSendRawMiddlewareBuilder.build(wallet.account),
layer=0,
)
w3.eth.default_account = wallet.address

Expand Down Expand Up @@ -126,3 +128,9 @@ async def fetch_json(self, ipfs_hash: str) -> dict | list:
async def setup_clients() -> None:
await execution_client.setup() # type: ignore
await execution_non_retry_client.setup() # type: ignore


async def close_clients() -> None:
await execution_client.provider.disconnect()
await execution_non_retry_client.provider.disconnect()
await consensus_client.disconnect()
10 changes: 5 additions & 5 deletions src/common/contracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def events(self) -> AsyncContractEvents:
return self.contract.events

def encode_abi(self, fn_name: str, args: list | None = None) -> HexStr:
return self.contract.encodeABI(fn_name=fn_name, args=args)
return self.contract.encode_abi(fn_name, args=args)

async def _get_last_event(
self,
Expand All @@ -69,8 +69,8 @@ async def _get_last_event(
blocks_range = settings.events_blocks_range_interval
while to_block >= from_block:
events = await event.get_logs(
fromBlock=BlockNumber(max(to_block - blocks_range, from_block)),
toBlock=to_block,
from_block=BlockNumber(max(to_block - blocks_range, from_block)),
to_block=to_block,
argument_filters=argument_filters,
)
if events:
Expand All @@ -88,8 +88,8 @@ async def _get_events(
blocks_range = settings.events_blocks_range_interval
while to_block >= from_block:
range_events = await event.get_logs(
fromBlock=from_block,
toBlock=BlockNumber(min(from_block + blocks_range, to_block)),
from_block=from_block,
to_block=BlockNumber(min(from_block + blocks_range, to_block)),
)
if range_events:
events.extend(range_events)
Expand Down
3 changes: 0 additions & 3 deletions src/common/credentials.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from functools import cached_property
from multiprocessing import Pool
Expand Down Expand Up @@ -29,7 +28,6 @@
)
from sw_utils.typings import Bytes32
from web3 import Web3
from web3._utils import request

from src.common.typings import ValidatorType
from src.config.networks import NETWORKS
Expand Down Expand Up @@ -174,7 +172,6 @@ def _generate_credentials_chunk(
) -> list[Credential]:
# Hack to run web3 sessions in multiprocessing mode
# pylint: disable-next=protected-access
request._async_session_pool = ThreadPoolExecutor(max_workers=1)

credentials: list[Credential] = []
for index in indexes:
Expand Down
37 changes: 21 additions & 16 deletions src/common/startup_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ async def wait_for_consensus_node() -> None:
done = False
while True:
for consensus_endpoint in settings.consensus_endpoints:
consensus_client = get_consensus_client(
[consensus_endpoint],
user_agent=OPERATOR_USER_AGENT,
)
try:
consensus_client = get_consensus_client(
[consensus_endpoint],
user_agent=OPERATOR_USER_AGENT,
)
syncing = await consensus_client.get_syncing()
if syncing['data']['is_syncing'] is True:
logger.warning(
Expand All @@ -155,6 +155,8 @@ async def wait_for_consensus_node() -> None:
consensus_endpoint,
e,
)
finally:
await consensus_client.disconnect()
if done:
return
logger.warning('Consensus nodes are not ready. Retrying in 10 seconds...')
Expand All @@ -168,13 +170,12 @@ async def wait_for_execution_node() -> None:
done = False
while True:
for execution_endpoint in settings.execution_endpoints:
execution_client = get_execution_client(
[execution_endpoint],
jwt_secret=settings.execution_jwt_secret,
user_agent=OPERATOR_USER_AGENT,
)
try:
execution_client = get_execution_client(
[execution_endpoint],
jwt_secret=settings.execution_jwt_secret,
user_agent=OPERATOR_USER_AGENT,
)

syncing = await execution_client.eth.syncing
if syncing is True:
logger.warning(
Expand Down Expand Up @@ -203,6 +204,8 @@ async def wait_for_execution_node() -> None:
execution_endpoint,
e,
)
finally:
await execution_client.provider.disconnect()
if done:
return
logger.warning('Execution nodes are not ready. Retrying in 10 seconds...')
Expand Down Expand Up @@ -306,6 +309,7 @@ async def _check_consensus_nodes_network() -> None:
[consensus_endpoint], user_agent=OPERATOR_USER_AGENT
)
deposit_contract_data = (await consensus_client.get_deposit_contract())['data']
await consensus_client.disconnect()
consensus_chain_id = int(deposit_contract_data['chain_id'])
consensus_network = chain_id_to_network.get(consensus_chain_id)
if settings.network_config.CHAIN_ID != consensus_chain_id:
Expand All @@ -327,6 +331,7 @@ async def _check_execution_nodes_network() -> None:
user_agent=OPERATOR_USER_AGENT,
)
execution_chain_id = await execution_client.eth.chain_id
await execution_client.provider.disconnect()
execution_network = chain_id_to_network.get(execution_chain_id)
if settings.network_config.CHAIN_ID != execution_chain_id:
raise ValueError(
Expand Down Expand Up @@ -383,18 +388,18 @@ async def check_vault_version() -> None:

async def _check_events_logs() -> None:
"""Check that EL client didn't prune logs"""
events = await keeper_contract.events.ConfigUpdated.get_logs( # type: ignore
fromBlock=settings.network_config.CONFIG_UPDATE_EVENT_BLOCK,
toBlock=settings.network_config.CONFIG_UPDATE_EVENT_BLOCK,
events = await keeper_contract.events.ConfigUpdated.get_logs(
from_block=settings.network_config.CONFIG_UPDATE_EVENT_BLOCK,
to_block=settings.network_config.CONFIG_UPDATE_EVENT_BLOCK,
)
if not events:
raise ValueError(
"Can't find oracle config. Please, ensure that EL client didn't prune event logs."
)

events = await validators_registry_contract.events.DepositEvent.get_logs( # type: ignore
fromBlock=settings.network_config.GENESIS_VALIDATORS_LAST_BLOCK,
toBlock=settings.network_config.GENESIS_VALIDATORS_LAST_BLOCK,
events = await validators_registry_contract.events.DepositEvent.get_logs(
from_block=settings.network_config.GENESIS_VALIDATORS_LAST_BLOCK,
to_block=settings.network_config.GENESIS_VALIDATORS_LAST_BLOCK,
)
if not events:
raise ValueError(
Expand Down
8 changes: 3 additions & 5 deletions src/validators/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ async def get_latest_network_validator_public_keys() -> Set[HexStr]:
else:
from_block = settings.network_config.VALIDATORS_REGISTRY_GENESIS_BLOCK

new_events = await validators_registry_contract.events.DepositEvent.get_logs( # type: ignore
fromBlock=from_block
new_events = await validators_registry_contract.events.DepositEvent.get_logs(
from_block=from_block
)
new_public_keys: Set[HexStr] = set()
for event in new_events:
Expand All @@ -182,9 +182,7 @@ async def get_latest_vault_v2_validator_public_keys(vault_address: ChecksumAddre
else:
from_block = settings.network_config.KEEPER_GENESIS_BLOCK
vault_contract = VaultContract(vault_address)
events = await vault_contract.events.V2ValidatorRegistered.get_logs( # type: ignore
fromBlock=from_block
)
events = await vault_contract.events.V2ValidatorRegistered.get_logs(from_block=from_block)
return {Web3.to_hex(event['args']['publicKey']) for event in events}


Expand Down
4 changes: 2 additions & 2 deletions src/validators/register_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async def register_validators(
calls.append(
vault_contract.encode_abi(
fn_name='registerValidators',
args=[keeper_approval_params, validators_manager_signature],
args=[keeper_approval_params, Web3.to_bytes(hexstr=validators_manager_signature)],
)
)

Expand Down Expand Up @@ -110,7 +110,7 @@ async def fund_validators(
calls.append(vault_contract.get_update_state_call(harvest_params))
fund_validators_call = vault_contract.encode_abi(
fn_name='fundValidators',
args=[b''.join(tx_validators), validators_manager_signature],
args=[b''.join(tx_validators), Web3.to_bytes(hexstr=validators_manager_signature)],
)
calls.append(fund_validators_call)

Expand Down
Loading