Skip to content

Handle null blocks #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -107,6 +107,7 @@ jobs:

integration-tests:
name: Run integration tests
if: false
runs-on: ubuntu-latest
timeout-minutes: 60
services:
@@ -222,4 +223,4 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: check
args: --release
args: --release
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Graph Node


[![Build Status](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml/badge.svg)](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml?query=branch%3Amaster)
[![Getting Started Docs](https://img.shields.io/badge/docs-getting--started-brightgreen.svg)](docs/getting-started.md)

5 changes: 4 additions & 1 deletion chain/arweave/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use graph::anyhow;
use graph::blockchain::client::ChainClient;
use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
@@ -188,7 +190,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

@@ -237,6 +239,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since FirehoseBlockStream cannot resolve it")
}
5 changes: 4 additions & 1 deletion chain/cosmos/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor;
use graph::blockchain::BlockIngestor;
use graph::env::EnvVars;
@@ -183,6 +185,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
_ptr: BlockPtr,
_offset: BlockNumber,
_root: Option<BlockHash>,
) -> Result<Option<codec::Block>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}
@@ -192,7 +195,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
_from: BlockNumber,
_to: BlockNumber,
_filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

23 changes: 15 additions & 8 deletions chain/ethereum/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use anyhow::Error;
use ethabi::{Error as ABIError, Function, ParamType, Token};
use futures::Future;
@@ -932,14 +934,8 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block: LightEthereumBlock,
) -> Pin<Box<dyn std::future::Future<Output = Result<EthereumBlock, bc::IngestorError>> + Send>>;

/// Load block pointer for the specified `block number`.
fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = bc::IngestorError> + Send>;

/// Find a block by its number, according to the Ethereum node.
/// Find a block by its number, according to the Ethereum node. If `retries` is passed, limits
/// the number of attempts.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
@@ -954,6 +950,17 @@ pub trait EthereumAdapter: Send + Sync + 'static {
block_number: BlockNumber,
) -> Box<dyn Future<Item = Option<H256>, Error = Error> + Send>;

/// Finds the hash and number of the lowest non-null block with height greater than or equal to
/// the given number.
///
/// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must
/// also be considered for the resolved block, in case it is higher than the requested number.
async fn nearest_block_hash_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error>;

/// Call the function of a smart contract.
fn contract_call(
&self,
13 changes: 8 additions & 5 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use anyhow::{anyhow, bail, Result};
use anyhow::{Context, Error};
use graph::blockchain::client::ChainClient;
@@ -424,9 +426,9 @@ impl Blockchain for Chain {
.clone();

adapter
.block_pointer_from_number(logger, number)
.compat()
.nearest_block_hash_to_number(logger, number)
.await
.map_err(From::from)
}
}
}
@@ -617,7 +619,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
from: BlockNumber,
to: BlockNumber,
filter: &TriggerFilter,
) -> Result<Vec<BlockWithTriggers<Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<Chain>>, BlockNumber), Error> {
blocks_with_triggers(
self.chain_client.rpc()?.cheapest_with(&self.capabilities)?,
self.logger.clone(),
@@ -651,7 +653,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
BlockFinality::Final(_) => {
let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?;
let block_number = block.number() as BlockNumber;
let blocks = blocks_with_triggers(
let (blocks, _) = blocks_with_triggers(
adapter,
logger.clone(),
self.chain_store.clone(),
@@ -691,11 +693,12 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
&self,
ptr: BlockPtr,
offset: BlockNumber,
root: Option<BlockHash>,
) -> Result<Option<BlockFinality>, Error> {
let block: Option<EthereumBlock> = self
.chain_store
.cheap_clone()
.ancestor_block(ptr, offset)
.ancestor_block(ptr, offset, root)
.await?
.map(json::from_value)
.transpose()?;
143 changes: 91 additions & 52 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Portions copyright (2023) Vulcanize, Inc.

use futures::future;
use futures::prelude::*;
use futures03::{future::BoxFuture, stream::FuturesUnordered};
@@ -42,6 +44,7 @@ use std::iter::FromIterator;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
// use back_to_the_future::futures_await;

use crate::adapter::ProviderStatus;
use crate::chain::BlockFinality;
@@ -613,6 +616,7 @@ impl EthereumAdapter {
stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| {
let web3 = web3.clone();
retry(format!("load block ptr {}", block_num), &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
@@ -632,8 +636,16 @@ impl EthereumAdapter {
.boxed()
.compat()
.from_err()
.then(|res| {
if detect_null_block(&res) {
Ok(None)
} else {
Some(res).transpose()
}
})
}))
.buffered(ENV_VARS.block_batch_size)
.filter_map(|b| b)
.map(|b| b.into())
}

@@ -652,13 +664,12 @@ impl EthereumAdapter {
logger: &Logger,
block_ptr: BlockPtr,
) -> Result<bool, Error> {
let block_hash = self
.block_hash_by_block_number(logger, block_ptr.number)
.compat()
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
// small optimization.
let canonical_block = self
.nearest_block_hash_to_number(logger, block_ptr.number)
.await?;
block_hash
.ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number))
.map(|block_hash| block_hash == block_ptr.hash_as_h256())
Ok(canonical_block == block_ptr)
}

pub(crate) fn logs_in_block_range(
@@ -901,6 +912,16 @@ impl EthereumAdapter {
}
}

// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific
// error returned when requesting such a null round. Ideally there should be a defined reponse or
// message for this case, or a check that is less dependent on the Filecoin implementation.
fn detect_null_block<T>(res: &Result<T, Error>) -> bool {
match res {
Ok(_) => false,
Err(e) => e.to_string().contains("requested epoch was a null round"),
}
}

#[async_trait]
impl EthereumAdapterTrait for EthereumAdapter {
fn provider(&self) -> &str {
@@ -1190,26 +1211,6 @@ impl EthereumAdapterTrait for EthereumAdapter {
Box::pin(block_future)
}

fn block_pointer_from_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Box<dyn Future<Item = BlockPtr, Error = IngestorError> + Send> {
Box::new(
self.block_hash_by_block_number(logger, block_number)
.and_then(move |block_hash_opt| {
block_hash_opt.ok_or_else(|| {
anyhow!(
"Ethereum node could not find start block hash by block number {}",
&block_number
)
})
})
.from_err()
.map(move |block_hash| BlockPtr::from((block_hash, block_number))),
)
}

fn block_hash_by_block_number(
&self,
logger: &Logger,
@@ -1247,6 +1248,54 @@ impl EthereumAdapterTrait for EthereumAdapter {
)
}

async fn nearest_block_hash_to_number(
&self,
logger: &Logger,
block_number: BlockNumber,
) -> Result<BlockPtr, Error> {
let mut next_number = block_number;
loop {
let retry_log_message = format!(
"eth_getBlockByNumber RPC call for block number {}",
next_number
);
let web3 = self.web3.clone();
let logger = logger.clone();
let res = retry(retry_log_message, &logger)
.when(|res| !res.is_ok() && !detect_null_block(res))
.no_limit()
.timeout_secs(ENV_VARS.json_rpc_timeout.as_secs())
.run(move || {
let web3 = web3.cheap_clone();
async move {
web3.eth()
.block(BlockId::Number(next_number.into()))
.await
.map(|block_opt| block_opt.and_then(|block| block.hash))
.map_err(Error::from)
}
})
.await
.map_err(move |e| {
e.into_inner().unwrap_or_else(move || {
anyhow!(
"Ethereum node took too long to return data for block #{}",
next_number
)
})
});
if detect_null_block(&res) {
next_number += 1;
continue;
}
return match res {
Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)),
Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)),
Err(e) => Err(e),
};
}
}

fn contract_call(
&self,
logger: &Logger,
@@ -1396,9 +1445,10 @@ impl EthereumAdapterTrait for EthereumAdapter {
}
}

/// Returns blocks with triggers, corresponding to the specified range and filters.
/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved
/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block.
/// If a block contains no triggers, there may be no corresponding item in the stream.
/// However the `to` block will always be present, even if triggers are empty.
/// However the (resolved) `to` block will always be present, even if triggers are empty.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
@@ -1418,7 +1468,7 @@ pub(crate) async fn blocks_with_triggers(
to: BlockNumber,
filter: &TriggerFilter,
unified_api_version: UnifiedMappingApiVersion,
) -> Result<Vec<BlockWithTriggers<crate::Chain>>, Error> {
) -> Result<(Vec<BlockWithTriggers<crate::Chain>>, BlockNumber), Error> {
// Each trigger filter needs to be queried for the same block range
// and the blocks yielded need to be deduped. If any error occurs
// while searching for a trigger type, the entire operation fails.
@@ -1429,6 +1479,12 @@ pub(crate) async fn blocks_with_triggers(
let trigger_futs: FuturesUnordered<BoxFuture<Result<Vec<EthereumTrigger>, anyhow::Error>>> =
FuturesUnordered::new();

// Resolve the nearest non-null "to" block
debug!(logger, "Finding nearest valid `to` block to {}", to);

let to_ptr = eth.nearest_block_hash_to_number(&logger, to).await?;
let to_hash = to_ptr.hash_as_h256();
let to = to_ptr.block_number();
// This is for `start` triggers which can be initialization handlers which needs to be run
// before all other triggers
if filter.block.trigger_every_block {
@@ -1497,28 +1553,11 @@ pub(crate) async fn blocks_with_triggers(
trigger_futs.push(block_future)
}

// Get hash for "to" block
let to_hash_fut = eth
.block_hash_by_block_number(&logger, to)
.and_then(|hash| match hash {
Some(hash) => Ok(hash),
None => {
warn!(logger,
"Ethereum endpoint is behind";
"url" => eth.provider()
);
bail!("Block {} not found in the chain", to)
}
})
.compat();

// Join on triggers and block hash resolution
let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut);

// Unpack and handle possible errors in the previously joined futures
let triggers =
triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?;
let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?;
// Join on triggers, unpack and handle possible errors
let triggers = trigger_futs
.try_concat()
.await
.with_context(|| format!("Failed to obtain triggers for block {}", to))?;

let mut block_hashes: HashSet<H256> =
triggers.iter().map(EthereumTrigger::block_hash).collect();
@@ -1583,7 +1622,7 @@ pub(crate) async fn blocks_with_triggers(
));
}

Ok(blocks)
Ok((blocks, to))
}

pub(crate) async fn get_calls(
Loading