diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1b400f1fbd7..15078095e3a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,6 +107,7 @@ jobs: integration-tests: name: Run integration tests + if: false runs-on: ubuntu-latest timeout-minutes: 60 services: @@ -222,4 +223,4 @@ jobs: uses: actions-rs/cargo@v1 with: command: check - args: --release \ No newline at end of file + args: --release diff --git a/README.md b/README.md index ff31fdad758..e4cfb0ce425 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Graph Node + [![Build Status](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml/badge.svg)](https://github.com/graphprotocol/graph-node/actions/workflows/ci.yml?query=branch%3Amaster) [![Getting Started Docs](https://img.shields.io/badge/docs-getting--started-brightgreen.svg)](docs/getting-started.md) diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index b827c78948a..ab1767252f4 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use graph::anyhow; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; @@ -188,7 +190,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -237,6 +239,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 5cc1a6f9b1b..ab009c8002c 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::BlockIngestor; use graph::env::EnvVars; @@ -183,6 +185,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -192,7 +195,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index db607a40233..754f90cfbcd 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; use futures::Future; @@ -932,14 +934,8 @@ pub trait EthereumAdapter: Send + Sync + 'static { block: LightEthereumBlock, ) -> Pin> + Send>>; - /// Load block pointer for the specified `block number`. - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send>; - - /// Find a block by its number, according to the Ethereum node. + /// Find a block by its number, according to the Ethereum node. If `retries` is passed, limits + /// the number of attempts. /// /// Careful: don't use this function without considering race conditions. /// Chain reorgs could happen at any time, and could affect the answer received. @@ -954,6 +950,17 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_number: BlockNumber, ) -> Box, Error = Error> + Send>; + /// Finds the hash and number of the lowest non-null block with height greater than or equal to + /// the given number. + /// + /// Note that the same caveats on reorgs apply as for `block_hash_by_block_number`, and must + /// also be considered for the resolved block, in case it is higher than the requested number. + async fn nearest_block_hash_to_number( + &self, + logger: &Logger, + block_number: BlockNumber, + ) -> Result; + /// Call the function of a smart contract. fn contract_call( &self, diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 32697c15dd2..d9ef278d052 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::{anyhow, bail, Result}; use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; @@ -424,9 +426,9 @@ impl Blockchain for Chain { .clone(); adapter - .block_pointer_from_number(logger, number) - .compat() + .nearest_block_hash_to_number(logger, number) .await + .map_err(From::from) } } } @@ -617,7 +619,7 @@ impl TriggersAdapterTrait for TriggersAdapter { from: BlockNumber, to: BlockNumber, filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { blocks_with_triggers( self.chain_client.rpc()?.cheapest_with(&self.capabilities)?, self.logger.clone(), @@ -651,7 +653,7 @@ impl TriggersAdapterTrait for TriggersAdapter { BlockFinality::Final(_) => { let adapter = self.chain_client.rpc()?.cheapest_with(&self.capabilities)?; let block_number = block.number() as BlockNumber; - let blocks = blocks_with_triggers( + let (blocks, _) = blocks_with_triggers( adapter, logger.clone(), self.chain_store.clone(), @@ -691,11 +693,12 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { let block: Option = self .chain_store .cheap_clone() - .ancestor_block(ptr, offset) + .ancestor_block(ptr, offset, root) .await? .map(json::from_value) .transpose()?; diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index b699b7370db..e801983211f 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use futures::future; use futures::prelude::*; use futures03::{future::BoxFuture, stream::FuturesUnordered}; @@ -42,6 +44,7 @@ use std::iter::FromIterator; use std::pin::Pin; use std::sync::Arc; use std::time::Instant; +// use back_to_the_future::futures_await; use crate::adapter::ProviderStatus; use crate::chain::BlockFinality; @@ -613,6 +616,7 @@ impl EthereumAdapter { stream::iter_ok::<_, Error>(block_nums.into_iter().map(move |block_num| { let web3 = web3.clone(); retry(format!("load block ptr {}", block_num), &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) .no_limit() .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) .run(move || { @@ -632,8 +636,16 @@ impl EthereumAdapter { .boxed() .compat() .from_err() + .then(|res| { + if detect_null_block(&res) { + Ok(None) + } else { + Some(res).transpose() + } + }) })) .buffered(ENV_VARS.block_batch_size) + .filter_map(|b| b) .map(|b| b.into()) } @@ -652,13 +664,12 @@ impl EthereumAdapter { logger: &Logger, block_ptr: BlockPtr, ) -> Result { - let block_hash = self - .block_hash_by_block_number(logger, block_ptr.number) - .compat() + // TODO: This considers null blocks, but we could instead bail if we encounter one as a + // small optimization. + let canonical_block = self + .nearest_block_hash_to_number(logger, block_ptr.number) .await?; - block_hash - .ok_or_else(|| anyhow!("Ethereum node is missing block #{}", block_ptr.number)) - .map(|block_hash| block_hash == block_ptr.hash_as_h256()) + Ok(canonical_block == block_ptr) } pub(crate) fn logs_in_block_range( @@ -901,6 +912,16 @@ impl EthereumAdapter { } } +// Detects null blocks as can occur on Filecoin EVM chains, by checking for the FEVM-specific +// error returned when requesting such a null round. Ideally there should be a defined reponse or +// message for this case, or a check that is less dependent on the Filecoin implementation. +fn detect_null_block(res: &Result) -> bool { + match res { + Ok(_) => false, + Err(e) => e.to_string().contains("requested epoch was a null round"), + } +} + #[async_trait] impl EthereumAdapterTrait for EthereumAdapter { fn provider(&self) -> &str { @@ -1190,26 +1211,6 @@ impl EthereumAdapterTrait for EthereumAdapter { Box::pin(block_future) } - fn block_pointer_from_number( - &self, - logger: &Logger, - block_number: BlockNumber, - ) -> Box + Send> { - Box::new( - self.block_hash_by_block_number(logger, block_number) - .and_then(move |block_hash_opt| { - block_hash_opt.ok_or_else(|| { - anyhow!( - "Ethereum node could not find start block hash by block number {}", - &block_number - ) - }) - }) - .from_err() - .map(move |block_hash| BlockPtr::from((block_hash, block_number))), - ) - } - fn block_hash_by_block_number( &self, logger: &Logger, @@ -1247,6 +1248,54 @@ impl EthereumAdapterTrait for EthereumAdapter { ) } + async fn nearest_block_hash_to_number( + &self, + logger: &Logger, + block_number: BlockNumber, + ) -> Result { + let mut next_number = block_number; + loop { + let retry_log_message = format!( + "eth_getBlockByNumber RPC call for block number {}", + next_number + ); + let web3 = self.web3.clone(); + let logger = logger.clone(); + let res = retry(retry_log_message, &logger) + .when(|res| !res.is_ok() && !detect_null_block(res)) + .no_limit() + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.cheap_clone(); + async move { + web3.eth() + .block(BlockId::Number(next_number.into())) + .await + .map(|block_opt| block_opt.and_then(|block| block.hash)) + .map_err(Error::from) + } + }) + .await + .map_err(move |e| { + e.into_inner().unwrap_or_else(move || { + anyhow!( + "Ethereum node took too long to return data for block #{}", + next_number + ) + }) + }); + if detect_null_block(&res) { + next_number += 1; + continue; + } + return match res { + Ok(Some(hash)) => Ok(BlockPtr::new(hash.into(), next_number)), + Ok(None) => Err(anyhow!("Block {} does not contain hash", next_number)), + Err(e) => Err(e), + }; + } + } + fn contract_call( &self, logger: &Logger, @@ -1396,9 +1445,10 @@ impl EthereumAdapterTrait for EthereumAdapter { } } -/// Returns blocks with triggers, corresponding to the specified range and filters. +/// Returns blocks with triggers, corresponding to the specified range and filters; and the resolved +/// `to` block, which is the nearest non-null block greater than or equal to the passed `to` block. /// If a block contains no triggers, there may be no corresponding item in the stream. -/// However the `to` block will always be present, even if triggers are empty. +/// However the (resolved) `to` block will always be present, even if triggers are empty. /// /// Careful: don't use this function without considering race conditions. /// Chain reorgs could happen at any time, and could affect the answer received. @@ -1418,7 +1468,7 @@ pub(crate) async fn blocks_with_triggers( to: BlockNumber, filter: &TriggerFilter, unified_api_version: UnifiedMappingApiVersion, -) -> Result>, Error> { +) -> Result<(Vec>, BlockNumber), Error> { // Each trigger filter needs to be queried for the same block range // and the blocks yielded need to be deduped. If any error occurs // while searching for a trigger type, the entire operation fails. @@ -1429,6 +1479,12 @@ pub(crate) async fn blocks_with_triggers( let trigger_futs: FuturesUnordered, anyhow::Error>>> = FuturesUnordered::new(); + // Resolve the nearest non-null "to" block + debug!(logger, "Finding nearest valid `to` block to {}", to); + + let to_ptr = eth.nearest_block_hash_to_number(&logger, to).await?; + let to_hash = to_ptr.hash_as_h256(); + let to = to_ptr.block_number(); // This is for `start` triggers which can be initialization handlers which needs to be run // before all other triggers if filter.block.trigger_every_block { @@ -1497,28 +1553,11 @@ pub(crate) async fn blocks_with_triggers( trigger_futs.push(block_future) } - // Get hash for "to" block - let to_hash_fut = eth - .block_hash_by_block_number(&logger, to) - .and_then(|hash| match hash { - Some(hash) => Ok(hash), - None => { - warn!(logger, - "Ethereum endpoint is behind"; - "url" => eth.provider() - ); - bail!("Block {} not found in the chain", to) - } - }) - .compat(); - - // Join on triggers and block hash resolution - let (triggers, to_hash) = futures03::join!(trigger_futs.try_concat(), to_hash_fut); - - // Unpack and handle possible errors in the previously joined futures - let triggers = - triggers.with_context(|| format!("Failed to obtain triggers for block {}", to))?; - let to_hash = to_hash.with_context(|| format!("Failed to infer hash for block {}", to))?; + // Join on triggers, unpack and handle possible errors + let triggers = trigger_futs + .try_concat() + .await + .with_context(|| format!("Failed to obtain triggers for block {}", to))?; let mut block_hashes: HashSet = triggers.iter().map(EthereumTrigger::block_hash).collect(); @@ -1583,7 +1622,7 @@ pub(crate) async fn blocks_with_triggers( )); } - Ok(blocks) + Ok((blocks, to)) } pub(crate) async fn get_calls( diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index e8925c89366..c76e09f1052 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::anyhow; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; @@ -312,7 +314,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } @@ -386,6 +388,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index 060a502d80d..98e64fd1e24 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -355,6 +355,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } @@ -370,7 +371,7 @@ impl TriggersAdapterTrait for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &crate::adapter::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 0e42eb7734a..bc89b81efd6 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use std::sync::Arc; use anyhow::Error; @@ -132,6 +134,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { unimplemented!() } @@ -141,7 +144,7 @@ impl blockchain::TriggersAdapter for TriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { unimplemented!() } diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 7a9b61f012e..0011b929630 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -258,14 +260,15 @@ impl BlockWithTriggers { #[async_trait] pub trait TriggersAdapter: Send + Sync { - // Return the block that is `offset` blocks before the block pointed to - // by `ptr` from the local cache. An offset of 0 means the block itself, - // an offset of 1 means the block's parent etc. If the block is not in - // the local cache, return `None` + // Return the block that is `offset` blocks before the block pointed to by `ptr` from the local + // cache. An offset of 0 means the block itself, an offset of 1 means the block's parent etc. If + // `root` is passed, short-circuit upon finding a child of `root`. If the block is not in the + // local cache, return `None`. async fn ancestor_block( &self, ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; // Returns a sequence of blocks in increasing order of block number. @@ -279,7 +282,7 @@ pub trait TriggersAdapter: Send + Sync { from: BlockNumber, to: BlockNumber, filter: &C::TriggerFilter, - ) -> Result>, Error>; + ) -> Result<(Vec>, BlockNumber), Error>; // Used for reprocessing blocks when creating a data source. async fn triggers_in_block( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index cc8fa9caaa7..ee88bd31827 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -1,10 +1,12 @@ +// Portions copyright (2023) Vulcanize, Inc. + use crate::{ components::{ link_resolver::LinkResolver, store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator}, }, data::subgraph::UnifiedMappingApiVersion, - prelude::DataSourceTemplateInfo, + prelude::{BlockHash, DataSourceTemplateInfo}, }; use anyhow::Error; use async_trait::async_trait; @@ -204,6 +206,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result, Error> { todo!() } @@ -213,7 +216,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: crate::components::store::BlockNumber, _to: crate::components::store::BlockNumber, _filter: &C::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index a9c52294c18..dfdc0061965 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use futures03::{stream::Stream, Future, FutureExt}; use std::cmp; @@ -363,22 +365,36 @@ where // 1000 triggers found, 2 per block, range_size = 1000 / 2 = 500 let range_size_upper_limit = max_block_range_size.min(ctx.previous_block_range_size * 10); - let range_size = if ctx.previous_triggers_per_block == 0.0 { + let target_range_size = if ctx.previous_triggers_per_block == 0.0 { range_size_upper_limit } else { (self.target_triggers_per_block_range as f64 / ctx.previous_triggers_per_block) .max(1.0) .min(range_size_upper_limit as f64) as BlockNumber }; - let to = cmp::min(from + range_size - 1, to_limit); + let to = cmp::min(from + target_range_size - 1, to_limit); info!( ctx.logger, "Scanning blocks [{}, {}]", from, to; - "range_size" => range_size + "target_range_size" => target_range_size ); - let blocks = self.adapter.scan_triggers(from, to, &self.filter).await?; + // Update with actually scanned range, to account for any skipped null blocks. + let (blocks, to) = self.adapter.scan_triggers(from, to, &self.filter).await?; + let range_size = to - from + 1; + + // There were no non-null finalized blocks greater than or equal to `to`. + // Retry until we find one. + if to > head_ptr.number - reorg_threshold { + return Ok(ReconciliationStep::Retry); + } + + info!( + ctx.logger, + "Scanned blocks [{}, {}]", from, to; + "range_size" => range_size + ); Ok(ReconciliationStep::ProcessDescendantBlocks( blocks, range_size, @@ -415,7 +431,10 @@ where // In principle this block should be in the store, but we have seen this error for deep // reorgs in ropsten. - let head_ancestor_opt = self.adapter.ancestor_block(head_ptr, offset).await?; + let head_ancestor_opt = self + .adapter + .ancestor_block(head_ptr, offset, Some(subgraph_ptr.hash.clone())) + .await?; match head_ancestor_opt { None => { @@ -427,6 +446,15 @@ where Ok(ReconciliationStep::Retry) } Some(head_ancestor) => { + // Check if there was an interceding skipped (null) block. + if head_ancestor.number() != subgraph_ptr.number + 1 { + warn!( + ctx.logger, + "skipped block detected: {}", + subgraph_ptr.number + 1 + ); + } + // We stopped one block short, so we'll compare the parent hash to the // subgraph ptr. if head_ancestor.parent_hash().as_ref() == Some(&subgraph_ptr.hash) { diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index cf23ae86ad5..91d8358d2b8 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -221,6 +221,16 @@ impl From<(Vec, u64)> for BlockPtr { } } +impl From<(Vec, i64)> for BlockPtr { + fn from((bytes, number): (Vec, i64)) -> Self { + let number = i32::try_from(number).unwrap(); + BlockPtr { + hash: BlockHash::from(bytes), + number, + } + } +} + impl From<(H256, u64)> for BlockPtr { fn from((hash, number): (H256, u64)) -> BlockPtr { let number = i32::try_from(number).unwrap(); diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 95463f9801d..826b7accdc5 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use anyhow::Error; use async_trait::async_trait; use web3::types::{Address, H256}; @@ -463,14 +465,16 @@ pub trait ChainStore: Send + Sync + 'static { ) -> Result, Error>; /// Get the `offset`th ancestor of `block_hash`, where offset=0 means the block matching - /// `block_hash` and offset=1 means its parent. Returns None if unable to complete due to - /// missing blocks in the chain store. + /// `block_hash` and offset=1 means its parent. If `root` is passed, short-circuit upon finding + /// a child of `root`. Returns None if unable to complete due to missing blocks in the chain + /// store. /// /// Returns an error if the offset would reach past the genesis block. async fn ancestor_block( self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error>; /// Remove old blocks from the cache we maintain in the database and diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 31978ebb7d5..b62e0dcca87 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use std::sync::Arc; use graph::blockchain::BlockPtr; @@ -100,7 +102,7 @@ pub async fn info( let ancestor = match &head_block { None => None, Some(head_block) => chain_store - .ancestor_block(head_block.clone(), offset) + .ancestor_block(head_block.clone(), offset, None) .await? .map(json::from_value::) .transpose()? diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index ee0e1193024..cfff9e2a6ab 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; @@ -376,10 +378,10 @@ mod data { create index blocks_number ON {nsp}.blocks using btree(number); create table {nsp}.call_cache ( - id bytea not null primary key, - return_value bytea not null, - contract_address bytea not null, - block_number int4 not null + id bytea not null primary key, + return_value bytea not null, + contract_address bytea not null, + block_number int4 not null ); create index call_cache_block_number_idx ON {nsp}.call_cache(block_number); @@ -886,10 +888,18 @@ mod data { conn: &PgConnection, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { - let data_and_hash = match self { + let short_circuit_predicate = if root.is_some() { + "and b.parent_hash <> $3" + } else { + "" + }; + + let data_and_ptr = match self { Storage::Shared => { - const ANCESTOR_SQL: &str = " + let query = format!( + " with recursive ancestors(block_hash, block_offset) as ( values ($1, 0) union all @@ -897,27 +907,49 @@ mod data { from ancestors a, ethereum_blocks b where a.block_hash = b.hash and a.block_offset < $2 + {} ) - select a.block_hash as hash + select a.block_hash as hash, b.number as number from ancestors a - where a.block_offset = $2;"; + inner join ethereum_blocks b on a.block_hash = b.hash + order by a.block_offset desc limit 1", + short_circuit_predicate + ); + // type Result = (Text, i64); + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Text"] + hash: String, + #[sql_type = "BigInt"] + number: i64, + } - let hash = sql_query(ANCESTOR_SQL) - .bind::(block_ptr.hash_hex()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; + let block = match root { + Some(root) => sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .bind::(root.hash_hex()) + .get_result::(conn), + None => sql_query(query) + .bind::(block_ptr.hash_hex()) + .bind::(offset as i64) + .get_result::(conn), + } + .optional()?; use public::ethereum_blocks as b; - match hash { + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( b::table - .filter(b::hash.eq(&hash.hash)) + .filter(b::hash.eq(&block.hash)) .select(b::data) .first::(conn)?, - BlockHash::from_str(&hash.hash)?, + BlockPtr::new( + BlockHash::from_str(&block.hash)?, + i32::try_from(block.number).unwrap(), + ), )), } } @@ -932,27 +964,45 @@ mod data { from ancestors a, {} b where a.block_hash = b.hash and a.block_offset < $2 + {} ) - select a.block_hash as hash + select a.block_hash as hash, b.number as number from ancestors a - where a.block_offset = $2;", - blocks.qname + inner join ethereum_blocks b on encode(a.block_hash, 'hex') = b.hash + order by a.block_offset desc limit 1", + blocks.qname, short_circuit_predicate ); - let hash = sql_query(query) - .bind::(block_ptr.hash_slice()) - .bind::(offset as i64) - .get_result::(conn) - .optional()?; - match hash { + #[derive(QueryableByName)] + struct BlockHashAndNumber { + #[sql_type = "Bytea"] + hash: Vec, + #[sql_type = "BigInt"] + number: i64, + } + + let block = match root { + Some(root) => sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .bind::(root.as_slice()) + .get_result::(conn), + None => sql_query(query) + .bind::(block_ptr.hash_slice()) + .bind::(offset as i64) + .get_result::(conn), + } + .optional()?; + + match block { None => None, - Some(hash) => Some(( + Some(block) => Some(( blocks .table() - .filter(blocks.hash().eq(&hash.hash)) + .filter(blocks.hash().eq(&block.hash)) .select(blocks.data()) .first::(conn)?, - BlockHash::from(hash.hash), + BlockPtr::from((block.hash, block.number)), )), } } @@ -967,13 +1017,13 @@ mod data { let data_and_ptr = { use graph::prelude::serde_json::json; - data_and_hash.map(|(data, hash)| { + data_and_ptr.map(|(data, ptr)| { ( match data.get("block") { Some(_) => data, None => json!({ "block": data, "transaction_receipts": [] }), }, - BlockPtr::new(hash, block_ptr.number - offset), + ptr, ) }) }; @@ -1951,6 +2001,7 @@ impl ChainStoreTrait for ChainStore { self: Arc, block_ptr: BlockPtr, offset: BlockNumber, + root: Option, ) -> Result, Error> { ensure!( block_ptr.number >= offset, @@ -1971,7 +2022,7 @@ impl ChainStoreTrait for ChainStore { .with_conn(move |conn, _| { chain_store .storage - .ancestor_block(conn, block_ptr_clone, offset) + .ancestor_block(conn, block_ptr_clone, offset, root) .map_err(StoreError::from) .map_err(CancelableError::from) }) diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index ef9e00b3f59..5cd52453d00 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + //! Test ChainStore implementation of Store, in particular, how //! the chain head pointer gets updated in various situations @@ -40,8 +42,12 @@ where let chain_store = store.block_store().chain_store(name).expect("chain store"); // Run test - test(chain_store.cheap_clone(), store.cheap_clone()) - .unwrap_or_else(|_| panic!("test finishes successfully on network {}", name)); + test(chain_store.cheap_clone(), store.cheap_clone()).unwrap_or_else(|err| { + panic!( + "test finishes successfully on network {} with error {}", + name, err + ) + }); } }); } @@ -293,11 +299,11 @@ fn check_ancestor( offset: BlockNumber, exp: &FakeBlock, ) -> Result<(), Error> { - let act = executor::block_on( - store - .cheap_clone() - .ancestor_block(child.block_ptr(), offset), - )? + let act = executor::block_on(store.cheap_clone().ancestor_block( + child.block_ptr(), + offset, + None, + ))? .map(json::from_value::) .transpose()? .ok_or_else(|| anyhow!("block {} has no ancestor at offset {}", child.hash, offset))?; @@ -336,15 +342,16 @@ fn ancestor_block_simple() { for offset in [6, 7, 8, 50].iter() { let offset = *offset; - let res = executor::block_on( - store - .cheap_clone() - .ancestor_block(BLOCK_FIVE.block_ptr(), offset), - ); + let res = executor::block_on(store.cheap_clone().ancestor_block( + BLOCK_FIVE.block_ptr(), + offset, + None, + )); assert!(res.is_err()); } - let block = executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1))?; + let block = + executor::block_on(store.ancestor_block(BLOCK_TWO_NO_PARENT.block_ptr(), 1, None))?; assert!(block.is_none()); Ok(()) }); diff --git a/tests/src/config.rs b/tests/src/config.rs index 331ecfba485..fae398a8721 100644 --- a/tests/src/config.rs +++ b/tests/src/config.rs @@ -261,7 +261,7 @@ impl Default for Config { graph_node: GraphNodeConfig::default(), graph_cli, num_parallel_tests, - timeout: Duration::from_secs(120), + timeout: Duration::from_secs(600), } } } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 1aaecee7605..050456da482 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -1,3 +1,5 @@ +// Portions copyright (2023) Vulcanize, Inc. + pub mod ethereum; pub mod substreams; @@ -926,6 +928,7 @@ impl TriggersAdapter for MockTriggersAdapter { &self, _ptr: BlockPtr, _offset: BlockNumber, + _root: Option, ) -> Result::Block>, Error> { todo!() } @@ -935,7 +938,7 @@ impl TriggersAdapter for MockTriggersAdapter { _from: BlockNumber, _to: BlockNumber, _filter: &::TriggerFilter, - ) -> Result>, Error> { + ) -> Result<(Vec>, BlockNumber), Error> { todo!() }