Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 124 additions & 31 deletions src/tq_oracle/adapters/asset_adapters/stakewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from web3.types import EventData

from ...abi import fetch_subvault_addresses, load_stakewise_vault_abi
from ...clients.etherscan_logs import EtherscanLogsClient
from ...constants import (
STAKEWISE_ADDRESSES,
STAKEWISE_EXIT_LOG_CHUNK,
Expand Down Expand Up @@ -128,6 +129,16 @@ def __init__(
self._rpc_jitter = getattr(config, "rpc_jitter", 0.10)
self._block_timestamp_cache: dict[int, int] = {}

# Optional Etherscan client for faster log queries
self._etherscan_client: EtherscanLogsClient | None = None
if adapter_config.etherscan_api_key:
self._etherscan_client = EtherscanLogsClient(
api_key=adapter_config.etherscan_api_key,
chain_id=config.chain_id,
page_size=adapter_config.etherscan_page_size,
)
logger.debug("StakeWise Etherscan client enabled for log queries")

extra_address_candidates = [
self.w3.to_checksum_address(addr)
for addr in adapter_config.extra_addresses
Expand Down Expand Up @@ -352,57 +363,139 @@ async def _scan_exit_queue_tickets(
return []

tickets: dict[int, ExitQueueTicket] = {}

# Try Etherscan first (one-shot from block 0, no chunking needed)
if self._etherscan_client:
logger.debug(
"StakeWise exit queue scan via Etherscan — vault=%s user=%s",
context.address,
user,
)
for event in context.exit_events:
logs = await self._get_exit_logs_etherscan(
event, user, 0, self.block_identifier, vault_address=context.address
)
if logs is not None:
await self._process_exit_logs(logs, tickets)
else:
# Etherscan failed, fall back to RPC for everything
tickets.clear()
return await self._scan_exit_queue_tickets_rpc(context, user)
else:
return await self._scan_exit_queue_tickets_rpc(context, user)

ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index))
logger.info(
"StakeWise exit queue scan completed (Etherscan) — vault=%s user=%s tickets=%d",
context.address,
user,
len(ordered),
)
return ordered

async def _scan_exit_queue_tickets_rpc(
self, context: StakewiseVaultContext, user: str
) -> list[ExitQueueTicket]:
"""Chunked RPC fallback for exit queue scanning."""
tickets: dict[int, ExitQueueTicket] = {}
min_block = self._resolve_min_block()

iterations = 0
logger.warning(
f"StakeWise exit queue scan start for address:{user} StakewiseVault: {context.address} from block {min_block}, this might take some time..."
"StakeWise exit queue scan via RPC — vault=%s user=%s from_block=%d (this may take time)",
context.address,
user,
min_block,
)
for from_block, to_block in self._block_ranges(
self.block_identifier, min_block
):
iterations += 1
for event in context.exit_events:
logs = await self._get_exit_logs(event, user, from_block, to_block)
for log in logs:
args = log["args"]
ticket_id = int(args["positionTicket"])
block_number = int(log["blockNumber"])
log_index = int(log["logIndex"])

existing = tickets.get(ticket_id)
if existing and not (
block_number > existing.block_number
or (
block_number == existing.block_number
and log_index > existing.log_index
)
):
continue

timestamp = await self._resolve_block_timestamp(block_number)
assets_value = args.get("assets")
tickets[ticket_id] = ExitQueueTicket(
ticket=ticket_id,
shares=int(args["shares"]),
receiver=self.w3.to_checksum_address(args["receiver"]),
block_number=block_number,
log_index=log_index,
timestamp=timestamp,
assets_hint=None if assets_value is None else int(assets_value),
)
logs = await self._get_exit_logs_rpc(event, user, from_block, to_block)
await self._process_exit_logs(logs, tickets)

ordered = sorted(tickets.values(), key=lambda t: (t.block_number, t.log_index))
logger.info(
"StakeWise exit queue scan completed — vault=%s user=%s tickets=%d iterations=%d",
"StakeWise exit queue scan completed (RPC) — vault=%s user=%s tickets=%d iterations=%d",
context.address,
user,
len(ordered),
iterations,
)
return ordered

async def _get_exit_logs(
async def _process_exit_logs(
self, logs: list[EventData], tickets: dict[int, ExitQueueTicket]
) -> None:
"""Process exit logs and update tickets dict."""
for log in logs:
args = log["args"]
ticket_id = int(args["positionTicket"])
block_number = int(log["blockNumber"])
log_index = int(log["logIndex"])

existing = tickets.get(ticket_id)
if existing and not (
block_number > existing.block_number
or (
block_number == existing.block_number
and log_index > existing.log_index
)
):
continue

timestamp = await self._resolve_block_timestamp(block_number)
assets_value = args.get("assets")
tickets[ticket_id] = ExitQueueTicket(
ticket=ticket_id,
shares=int(args["shares"]),
receiver=self.w3.to_checksum_address(args["receiver"]),
block_number=block_number,
log_index=log_index,
timestamp=timestamp,
assets_hint=None if assets_value is None else int(assets_value),
)

async def _get_exit_logs_etherscan(
self,
event: ContractEvent,
user: str,
from_block: int,
to_block: int,
*,
vault_address: str | None = None,
) -> list[EventData] | None:
# Try Etherscan first if available
if self._etherscan_client is None:
raise ValueError("Etherscan client not configured")

if vault_address is None:
raise ValueError("Vault address required for Etherscan")

try:
return await asyncio.to_thread(
self._etherscan_client.fetch_logs,
event,
vault_address,
{"owner": user},
from_block,
to_block,
)

except ValueError as exc: # pragma: no cover - provider variance
event_name = getattr(event, "abi", {}).get("name", "unknown")
logger.warning(
"StakeWise exit log query failed — event=%s user=%s chunk=[%d,%d] err=%s",
event_name,
user,
from_block,
to_block,
exc,
)
return None

async def _get_exit_logs_rpc(
self,
event: ContractEvent,
user: str,
Expand Down
3 changes: 3 additions & 0 deletions src/tq_oracle/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .etherscan_logs import EtherscanLogsClient

__all__ = ["EtherscanLogsClient"]
Loading