diff --git a/Cargo.lock b/Cargo.lock index 1c515550c1f..286168b4d07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2192,6 +2192,8 @@ dependencies = [ "anyhow", "assert-json-diff", "async-stream", + "async-trait", + "futures-core", "graph", "graph-chain-ethereum", "graph-chain-substreams", @@ -2201,6 +2203,7 @@ dependencies = [ "graph-runtime-wasm", "graph-server-index-node", "graph-store-postgres", + "prost-types 0.12.6", "secp256k1", "serde", "serde_yaml", diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index cf46a675212..229a8ae2382 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -298,7 +298,7 @@ pub struct Chain { reorg_threshold: BlockNumber, polling_ingestor_interval: Duration, pub is_ingestible: bool, - block_stream_builder: Arc>, + pub block_stream_builder: Arc>, block_refetcher: Arc>, adapter_selector: Arc>, runtime_adapter_builder: Arc, diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index b83415146ac..e7cab9ddb66 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -19,7 +19,8 @@ pub use buffered_call_cache::BufferedCallCache; // ETHDEP: These concrete types should probably not be exposed. pub use data_source::{ - BlockHandlerFilter, DataSource, DataSourceTemplate, Mapping, MappingABI, TemplateSource, + BlockHandlerFilter, DataSource, DataSourceTemplate, DecoderHook, Mapping, MappingABI, + TemplateSource, }; pub mod chain; diff --git a/core/src/lib.rs b/core/src/lib.rs index 448bb1041fd..1800bb9988d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -3,6 +3,6 @@ pub mod polling_monitor; mod subgraph; pub use crate::subgraph::{ - SubgraphAssignmentProvider, SubgraphInstanceManager, SubgraphRegistrar, SubgraphRunner, - SubgraphTriggerProcessor, + create_subgraph_version, SubgraphAssignmentProvider, SubgraphInstanceManager, + SubgraphRegistrar, SubgraphRunner, SubgraphTriggerProcessor, }; diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index c98641539d9..0cca75d0500 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -63,7 +63,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } BlockchainKind::Ethereum => { let runner = instance_manager @@ -77,7 +77,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } BlockchainKind::Near => { let runner = instance_manager @@ -91,7 +91,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } BlockchainKind::Cosmos => { let runner = instance_manager @@ -105,7 +105,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } BlockchainKind::Substreams => { let runner = instance_manager @@ -119,7 +119,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } BlockchainKind::Starknet => { let runner = instance_manager @@ -133,7 +133,7 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< ) .await?; - self.start_subgraph_inner(logger, loc, runner).await + self.start_subgraph_thread(logger, loc, runner).await } } }; @@ -466,7 +466,7 @@ impl SubgraphInstanceManager { )) } - async fn start_subgraph_inner( + pub async fn start_subgraph_thread( &self, logger: Logger, deployment: DeploymentLocator, @@ -504,4 +504,12 @@ impl SubgraphInstanceManager { Ok(()) } + + pub fn subgraph_logger(&self, loc: &DeploymentLocator) -> Logger { + self.logger_factory.subgraph_logger(loc) + } + + pub fn get_env_vars(&self) -> Arc { + self.env_vars.cheap_clone() + } } diff --git a/core/src/subgraph/mod.rs b/core/src/subgraph/mod.rs index 45f8d5b98ef..2531b9feebf 100644 --- a/core/src/subgraph/mod.rs +++ b/core/src/subgraph/mod.rs @@ -12,6 +12,6 @@ mod trigger_processor; pub use self::instance_manager::SubgraphInstanceManager; pub use self::provider::SubgraphAssignmentProvider; -pub use self::registrar::SubgraphRegistrar; +pub use self::registrar::{create_subgraph_version, SubgraphRegistrar}; pub use self::runner::SubgraphRunner; pub use self::trigger_processor::*; diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index fe80d118457..b8bac081fdb 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -28,16 +28,19 @@ use graph::tokio_retry::Retry; use graph::util::futures::retry_strategy; use graph::util::futures::RETRY_DEFAULT_LIMIT; +// Auto graft dependencies are synced up to this depth. +const MAX_AUTO_GRAFT_SYNC_DEPTH: u32 = 42; + pub struct SubgraphRegistrar { logger: Logger, logger_factory: LoggerFactory, - resolver: Arc, - provider: Arc

, - store: Arc, + pub resolver: Arc, + pub provider: Arc

, + pub store: Arc, subscription_manager: Arc, - chains: Arc, + pub chains: Arc, node_id: NodeId, - version_switching_mode: SubgraphVersionSwitchingMode, + pub version_switching_mode: SubgraphVersionSwitchingMode, assignment_event_stream_cancel_guard: CancelGuard, // cancels on drop settings: Arc, } @@ -266,6 +269,24 @@ where }) }) } + + pub fn subgraph_logger(&self, deployment: &DeploymentLocator) -> Logger { + self.logger.new(o!( + "subgraph_id" => deployment.hash.to_string(), + "sgd" => deployment.id.to_string() + )) + } + + pub fn get_settings(&self) -> Arc { + self.settings.clone() + } + + pub async fn resolve_raw_manifest( + &self, + hash: &DeploymentHash, + ) -> Result { + resolve_raw_manifest(&self.resolver, &self.logger, hash).await + } } #[async_trait] @@ -303,20 +324,8 @@ where .logger_factory .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); - let raw: serde_yaml::Mapping = { - let file_bytes = self - .resolver - .cat(&logger, &hash.to_ipfs_link()) - .await - .map_err(|e| { - SubgraphRegistrarError::ResolveError( - SubgraphManifestResolveError::ResolveError(e), - ) - })?; - - serde_yaml::from_slice(&file_bytes) - .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? - }; + let raw: serde_yaml::Mapping = + resolve_raw_manifest(&self.resolver, &self.logger, &hash).await?; let kind = BlockchainKind::from_manifest(&raw).map_err(|e| { SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) @@ -326,9 +335,15 @@ where let history_blocks = history_blocks.or(self.settings.for_name(&name).map(|c| c.history_blocks)); + let auto_graft_sync_depth = if self.store.auto_graft_sync() { + Some(0) + } else { + None + }; + let deployment_locator = match kind { BlockchainKind::Arweave => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -342,11 +357,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Ethereum => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -360,11 +377,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Near => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -378,11 +397,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Cosmos => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -396,11 +417,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Substreams => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -414,11 +437,13 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } BlockchainKind::Starknet => { - create_subgraph_version::( + create_subgraph_version::( &logger, self.store.clone(), self.chains.cheap_clone(), @@ -432,6 +457,8 @@ where self.version_switching_mode, &self.resolver, history_blocks, + auto_graft_sync_depth, + self.provider.clone(), ) .await? } @@ -555,9 +582,9 @@ async fn start_subgraph( } /// Resolves the subgraph's earliest block -async fn resolve_start_block( - manifest: &SubgraphManifest, - chain: &impl Blockchain, +async fn resolve_start_block( + manifest: &SubgraphManifest, + chain: &C, logger: &Logger, ) -> Result, SubgraphRegistrarError> { // If the minimum start block is 0 (i.e. the genesis block), @@ -591,20 +618,26 @@ async fn resolve_graft_block( chain: &impl Blockchain, logger: &Logger, ) -> Result { + debug!(&logger, "Resolve graft block"; "base" => base.base.to_string(), "block" => base.block); chain .block_pointer_from_number(logger, base.block) .await - .map_err(|_| { + .map_err(|err| { + error!(&logger, "Failed to resolve graft block"; "error" => err.to_string()); SubgraphRegistrarError::ManifestValidationError(vec![ SubgraphManifestValidationError::BlockNotFound(format!( - "graft base block {} not found", - base.block + "graft base {} block {} not found", + base.base, base.block )), ]) }) } -async fn create_subgraph_version( +pub async fn create_subgraph_version< + C: Blockchain, + S: SubgraphStore, + P: SubgraphAssignmentProviderTrait, +>( logger: &Logger, store: Arc, chains: Arc, @@ -618,9 +651,14 @@ async fn create_subgraph_version( version_switching_mode: SubgraphVersionSwitchingMode, resolver: &Arc, history_blocks_override: Option, + depth: Option, + provider: Arc

, ) -> Result { let raw_string = serde_yaml::to_string(&raw).unwrap(); - let unvalidated = UnvalidatedSubgraphManifest::::resolve( + + // We need to defer validation of the manifest until after we have synced the base subgraph. + + let unvalidated_manifest = UnvalidatedSubgraphManifest::::resolve( deployment.clone(), raw, resolver, @@ -630,16 +668,38 @@ async fn create_subgraph_version( .map_err(SubgraphRegistrarError::ResolveError) .await?; - // Determine if the graft_base should be validated. - // Validate the graft_base if there is a pending graft, ensuring its presence. - // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated. - // If the subgraph already exists and there is no pending graft, graft_base validation is not required. + if let (Some(depth), Some(graft)) = (depth, unvalidated_manifest.unvalidated_graft()) { + if depth < MAX_AUTO_GRAFT_SYNC_DEPTH { + Box::pin(auto_sync_graft::( + graft, + resolver, + logger, + &store, + &chains, + &name, + &node_id, + &debug_fork, + version_switching_mode, + history_blocks_override, + depth, + provider, + )) + .await?; + } else { + warn!( + logger, + "auto-graft-sync: subgraph grafts depth limit reached"; + "depth" => depth + ); + } + } + let should_validate = match store.graft_pending(&deployment) { Ok(graft_pending) => graft_pending, Err(StoreError::DeploymentNotFound(_)) => true, Err(e) => return Err(SubgraphRegistrarError::StoreError(e)), }; - let manifest = unvalidated + let manifest = unvalidated_manifest .validate(store.cheap_clone(), should_validate) .await .map_err(SubgraphRegistrarError::ManifestValidationError)?; @@ -732,3 +792,115 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::SubgraphDeploymentError) } + +/// Automatically syncs a subgraph graft from the base subgraph. +/// This will await the syncing of the base subgraph before proceeding. +/// Recursively calls `create_subgraph_version` to create any grafts of +/// this graft up to `MAX_AUTO_GRAFT_SYNC_DEPTH`.` +async fn auto_sync_graft( + graft: &Graft, + resolver: &Arc, + logger: &Logger, + store: &Arc, + chains: &Arc, + name: &SubgraphName, + node_id: &NodeId, + debug_fork: &Option, + version_switching_mode: SubgraphVersionSwitchingMode, + history_blocks_override: Option, + depth: u32, + provider: Arc

, +) -> Result { + info!( + logger, + "auto-graft-sync: begin graft sync"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string(), + "depth" => depth, + "block" => graft.block + ); + let subgraft_raw_manifest = resolve_raw_manifest(resolver, logger, &graft.base).await?; + + let deployment = graft.base.clone(); + + let name = &deployment[deployment.len().saturating_sub(10)..]; + let name = format!("auto-graft-sync/{}", name); + let name = + SubgraphName::new(name.clone()).map_err(|_| SubgraphRegistrarError::NameNotValid(name))?; + + info!( + logger, + "auto-graft-sync: create subgraph"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string() + ); + + let _ = store.create_subgraph(name.clone())?; + info!(logger, "Created subgraph"; "subgraph_name" => name.to_string(), "id" => deployment.to_string()); + + let locator = create_subgraph_version::( + logger, + store.clone(), + chains.clone(), + name.clone(), + graft.base.clone(), + None, + None, + subgraft_raw_manifest.clone(), + node_id.clone(), + debug_fork.clone(), + version_switching_mode, + resolver, + history_blocks_override, + Some(depth + 1), + provider.clone(), + ) + .await?; + + info!( + logger, + "auto-graft-sync: awaiting subgraph sync"; + "subgraph" => name.to_string(), + "hash" => graft.base.to_string() + ); + + info!(&logger, "auto-graft-sync: starting graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string()); + provider + .start(locator.clone(), Some(graft.block)) + .await + .map_err(SubgraphRegistrarError::AutoGraftSubgraphAssignmentError)?; + + info!(&logger, "auto-graft-sync: waiting for graft sync"; "subgraph" => name.to_string(), "hash" => graft.base.to_string()); + graft + .await_sync(store.clone(), Duration::from_secs(1)) + .await?; + + info!( + logger, + "auto-graft-sync: sync complete"; + "subgraph" => name.to_string(), + "graft-hash" => graft.base.to_string(), + "depth" => depth, + "hash" => graft.base.to_string() + ); + Ok(locator) +} + +async fn resolve_raw_manifest( + resolver: &Arc, + logger: &Logger, + deployment_hash: &DeploymentHash, +) -> Result { + let subgraft_raw_manifest: serde_yaml::Mapping = { + let file_bytes = resolver + .cat(&logger, &deployment_hash.to_ipfs_link()) + .await + .map_err(|e| { + SubgraphRegistrarError::ResolveError(SubgraphManifestResolveError::ResolveError(e)) + })?; + + serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphRegistrarError::ResolveError(e.into()))? + }; + Ok(subgraft_raw_manifest) +} diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 0ac80902a66..02631fadf53 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -215,6 +215,8 @@ pub trait SubgraphStore: Send + Sync + 'static { /// When this flag is set, indexing of the deployment should log /// additional diagnostic information fn instrument(&self, deployment: &DeploymentLocator) -> Result; + + fn auto_graft_sync(&self) -> bool; } pub trait ReadStore: Send + Sync + 'static { diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index df379845c00..977ef46eaef 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -26,6 +26,8 @@ use stable_hash_legacy::SequenceNumber; use std::{ collections::{BTreeSet, HashMap, HashSet}, marker::PhantomData, + time::Duration, + time::Instant, }; use thiserror::Error; use wasmparser; @@ -284,6 +286,8 @@ pub enum SubgraphRegistrarError { NameExists(String), #[error("subgraph name not found: {0}")] NameNotFound(String), + #[error("subgraph name not valid: {0}")] + NameNotValid(String), #[error("network not supported by registrar: {0}")] NetworkNotSupported(Error), #[error("deployment not found: {0}")] @@ -298,6 +302,8 @@ pub enum SubgraphRegistrarError { ManifestValidationError(Vec), #[error("subgraph deployment error: {0}")] SubgraphDeploymentError(StoreError), + #[error("auto-graft-sync subgraph assignment error: {0}")] + AutoGraftSubgraphAssignmentError(SubgraphAssignmentProviderError), #[error("subgraph registrar error: {0}")] Unknown(#[from] anyhow::Error), } @@ -516,6 +522,39 @@ impl Graft { (Some(_), _) => Ok(()), } } + + /// Awaits the target block sync for the graft. + pub async fn await_sync( + &self, + store: Arc, + interval: Duration, + ) -> Result<(), SubgraphRegistrarError> { + const MAX_WAIT_NO_BLOCKS: Duration = Duration::from_secs(10); + let start = Instant::now(); + + loop { + let maybe_latest_block = store + .least_block_ptr(&self.base) + .await + .map_err(|e| SubgraphRegistrarError::DeploymentNotFound(e.to_string()))?; + + // TODO: could we get a stream over the block pointers? + if let Some(block) = maybe_latest_block { + if block.block_number() >= self.block { + break Ok(()); + } else { + tokio::time::sleep(interval).await; + } + } else { + if start.elapsed() > MAX_WAIT_NO_BLOCKS { + return Err(SubgraphRegistrarError::ManifestValidationError(vec![SubgraphManifestValidationError::GraftBaseInvalid(format!( + "failed to graft onto `{}` at block {} since it has not processed any blocks", + self.base, self.block + ))])); + } + } + } + } } #[derive(Clone, Debug)] @@ -747,6 +786,11 @@ impl UnvalidatedSubgraphManifest { pub fn spec_version(&self) -> &Version { &self.0.spec_version } + + /// Get the graft from this unvalidated manifest. + pub fn unvalidated_graft(&self) -> Option<&Graft> { + self.0.graft.as_ref() + } } impl SubgraphManifest { diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index a38148b25fe..f761b381b2d 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -547,7 +547,9 @@ macro_rules! deserialize_data_source { .map($t::Onchain) } else { Err(serde::de::Error::custom(format!( - "data source has invalid `kind`; expected {}, file/ipfs", + "data source ({}) has invalid `kind`({}); expected {}, file/ipfs", + std::any::type_name::<$t>(), + kind, C::KIND, ))) } diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 72d3f986c9c..1da2c4180ae 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -491,7 +491,8 @@ impl FirehoseEndpoint { } } -#[derive(Debug)] +// Clone is still required here for the runner tests. +#[derive(Debug, Clone)] pub struct FirehoseEndpoints(ChainName, ProviderManager>); impl FirehoseEndpoints { diff --git a/node/resources/tests/full_config.toml b/node/resources/tests/full_config.toml index 1f907539194..34f9b1d3622 100644 --- a/node/resources/tests/full_config.toml +++ b/node/resources/tests/full_config.toml @@ -1,5 +1,6 @@ [general] query = "query_node_.*" +auto_sync_grafts = false [store] [store.primary] diff --git a/node/src/chain.rs b/node/src/chain.rs index 6de493631cd..e24367d4b3a 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -610,6 +610,7 @@ mod test { ethereum_ws: vec![], ethereum_ipc: vec![], unsafe_config: false, + auto_graft_sync: false, }; let metrics = Arc::new(EndpointMetrics::mock()); diff --git a/node/src/config.rs b/node/src/config.rs index 8006b8efef7..27e85fed0c2 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -48,6 +48,7 @@ pub struct Opt { pub ethereum_ws: Vec, pub ethereum_ipc: Vec, pub unsafe_config: bool, + pub auto_graft_sync: bool, } impl Default for Opt { @@ -64,6 +65,7 @@ impl Default for Opt { ethereum_ws: vec![], ethereum_ipc: vec![], unsafe_config: false, + auto_graft_sync: false, } } } @@ -77,6 +79,8 @@ pub struct Config { pub stores: BTreeMap, pub chains: ChainSection, pub deployment: Deployment, + #[serde(default)] + pub auto_graft_sync: bool, } fn validate_name(s: &str) -> Result<()> { @@ -192,6 +196,7 @@ impl Config { let chains = ChainSection::from_opt(opt)?; let node = NodeId::new(opt.node_id.to_string()) .map_err(|()| anyhow!("invalid node id {}", opt.node_id))?; + let auto_graft_sync = opt.auto_graft_sync; stores.insert(PRIMARY_SHARD.to_string(), Shard::from_opt(true, opt)?); Ok(Config { node, @@ -199,6 +204,7 @@ impl Config { stores, chains, deployment, + auto_graft_sync, }) } diff --git a/node/src/opt.rs b/node/src/opt.rs index e4dc44ba92a..9d3d71cfcad 100644 --- a/node/src/opt.rs +++ b/node/src/opt.rs @@ -231,6 +231,7 @@ pub struct Opt { help = "Base URL for forking subgraphs" )] pub fork_base: Option, + #[clap( long, default_value = "8050", @@ -238,6 +239,13 @@ pub struct Opt { help = "Port for the graphman GraphQL server" )] pub graphman_port: u16, + + #[clap( + long, + env = "AUTO_GRAFT_SYNC", + help = "Automatically sync grafts from the base subgraph" + )] + pub auto_graft_sync: bool, } impl From for config::Opt { @@ -254,6 +262,7 @@ impl From for config::Opt { ethereum_ws, ethereum_ipc, unsafe_config, + auto_graft_sync, .. } = opt; @@ -269,6 +278,7 @@ impl From for config::Opt { ethereum_ws, ethereum_ipc, unsafe_config, + auto_graft_sync, } } } diff --git a/node/src/store_builder.rs b/node/src/store_builder.rs index 2a39d0ea6ed..e57dad7b391 100644 --- a/node/src/store_builder.rs +++ b/node/src/store_builder.rs @@ -155,6 +155,7 @@ impl StoreBuilder { notification_sender, fork_base, registry, + config.auto_graft_sync, )); (store, pools, coord) diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 41cbef15982..6d134f54317 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -207,6 +207,8 @@ pub struct SubgraphStore { /// subgraph forks will fetch entities. /// Example: https://api.thegraph.com/subgraphs/ fork_base: Option, + + auto_graft_sync: bool, } impl SubgraphStore { @@ -231,12 +233,14 @@ impl SubgraphStore { sender: Arc, fork_base: Option, registry: Arc, + auto_graft_sync: bool, ) -> Self { Self { inner: Arc::new(SubgraphStoreInner::new( logger, stores, placer, sender, registry, )), fork_base, + auto_graft_sync, } } @@ -1600,4 +1604,8 @@ impl SubgraphStoreTrait for SubgraphStore { let info = store.subgraph_info(site)?; Ok(info.instrument) } + + fn auto_graft_sync(&self) -> bool { + self.auto_graft_sync + } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b79702ceb7f..09d4e32c14b 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,9 +7,11 @@ edition.workspace = true anyhow = "1.0" assert-json-diff = "2.0.2" async-stream = "0.3.5" +async-trait = "0.1" +futures-core = "0.3" graph = { path = "../graph" } graph-chain-ethereum = { path = "../chain/ethereum" } -graph-chain-substreams= {path = "../chain/substreams"} +graph-chain-substreams = { path = "../chain/substreams" } graph-node = { path = "../node" } graph-core = { path = "../core" } graph-graphql = { path = "../graphql" } @@ -18,12 +20,16 @@ graph-server-index-node = { path = "../server/index-node" } graph-runtime-wasm = { path = "../runtime/wasm" } serde = { workspace = true } serde_yaml = { workspace = true } -slog = { version = "2.7.0", features = ["release_max_level_trace", "max_level_trace"] } +slog = { version = "2.7.0", features = [ + "release_max_level_trace", + "max_level_trace", +] } tokio = { version = "1.38.0", features = ["rt", "macros", "process"] } # Once graph upgrades to web3 0.19, we don't need this anymore. The version # here needs to be kept in sync with the web3 version that the graph crate # uses until then secp256k1 = { version = "0.21", features = ["recovery"] } +prost-types = { workspace = true } [dev-dependencies] anyhow = "1.0.86" diff --git a/tests/runner-tests/auto-graft-sync/.gitignore b/tests/runner-tests/auto-graft-sync/.gitignore new file mode 100644 index 00000000000..ffff03d1afe --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/.gitignore @@ -0,0 +1 @@ +auto-graft-sync*.yaml diff --git a/tests/runner-tests/auto-graft-sync/abis/Contract.abi b/tests/runner-tests/auto-graft-sync/abis/Contract.abi new file mode 100644 index 00000000000..02da1a9e7f3 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/abis/Contract.abi @@ -0,0 +1,33 @@ +[ + { + "inputs": [], + "stateMutability": "nonpayable", + "type": "constructor" + }, + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "uint16", + "name": "x", + "type": "uint16" + } + ], + "name": "Trigger", + "type": "event" + }, + { + "inputs": [ + { + "internalType": "uint16", + "name": "x", + "type": "uint16" + } + ], + "name": "emitTrigger", + "outputs": [], + "stateMutability": "nonpayable", + "type": "function" + } +] diff --git a/tests/runner-tests/auto-graft-sync/config.toml b/tests/runner-tests/auto-graft-sync/config.toml new file mode 100644 index 00000000000..0d2c55e11a3 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/config.toml @@ -0,0 +1,20 @@ +# This is the config flag for enabling auto-graft-sync, which is the target of this test. +auto_graft_sync = true + +[store] +[store.primary] +connection = "$THEGRAPH_STORE_POSTGRES_DIESEL_URL" +pool_size = 10 + +[deployment] +[[deployment.rule]] +store = "primary" +indexers = ["default"] + +[chains] +ingestor = "default" + +# The tests do not talk to ethereum clients +[chains.test] +shard = "primary" +provider = [{ label = "penguin", url = "http://localhost:1/", features = [] }] diff --git a/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js b/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js new file mode 100644 index 00000000000..e9ae741152f --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/generateAndPublishManifestFromTemplate.js @@ -0,0 +1,21 @@ + +const fs = require('fs'); +const { execSync } = require('child_process'); + +// This takes a subgraphName, outPath, and Qm.. hash as a CLI input, which is the graft base. +const outPath = process.argv[2]; +const graftBase = process.argv[3]; +const graftBlock = process.argv[4]; + +const yamlPath = './template.yaml'; +let yamlContent = fs.readFileSync(yamlPath, 'utf-8'); +yamlContent = yamlContent.replace(/base: .+/, `base: ${graftBase}`); +yamlContent = yamlContent.replace(/block: .+/, `block: ${graftBlock}`); +fs.writeFileSync(outPath, yamlContent); +console.log("fuzzba") + +// Assuming you have your IPFS_URI exported as environment variables. +// Instead of deploy, run graph build to -only upload to ipfs-. +execSync('graph build ' + outPath + ' --ipfs $IPFS_URI', { + stdio: 'inherit' +}); diff --git a/tests/runner-tests/auto-graft-sync/graft-base.yaml b/tests/runner-tests/auto-graft-sync/graft-base.yaml new file mode 100644 index 00000000000..5fd468a3342 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/graft-base.yaml @@ -0,0 +1,63 @@ +# for this test, this is the lowest level of the grafting hierarchy +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts + # Tests that adding a data source is possible in a graft + - kind: ethereum/contract + name: Contract2 + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + callHandlers: + - handler: handleBlock + function: emitTrigger(uint16) + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/runner-tests/auto-graft-sync/package.json b/tests/runner-tests/auto-graft-sync/package.json new file mode 100644 index 00000000000..d766288c0b8 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/package.json @@ -0,0 +1,13 @@ +{ + "name": "auto-graft-sync", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen graft-base.yaml --skip-migrations", + "build-graft-root": "graph build graft-base.yaml --ipfs $IPFS_URI", + "build:test-auto-graft-sync": "node generateAndPublishManifestFromTemplate.js" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.60.0", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/auto-graft-sync/schema.graphql b/tests/runner-tests/auto-graft-sync/schema.graphql new file mode 100644 index 00000000000..6c007b3245b --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/schema.graphql @@ -0,0 +1,5 @@ +# The `id` is the block number and `count` the handler invocations at that block. +type DataSourceCount @entity { + id: ID! + count: Int! +} diff --git a/tests/runner-tests/auto-graft-sync/src/mapping.ts b/tests/runner-tests/auto-graft-sync/src/mapping.ts new file mode 100644 index 00000000000..feb6f313bbc --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/src/mapping.ts @@ -0,0 +1,39 @@ +import { + ethereum, + DataSourceContext, + dataSource, + Address, + BigInt, +} from "@graphprotocol/graph-ts"; +import { Template } from "../generated/templates"; +import { DataSourceCount } from "../generated/schema"; + +export function handleBlock(block: ethereum.Block): void { + let context = new DataSourceContext(); + context.setBigInt("number", block.number); + context.setBytes("hash", block.hash); + + Template.createWithContext( + changetype

(Address.fromHexString( + "0x2E645469f354BB4F5c8a05B3b30A929361cf77eC" + )), + context + ); +} + +export function handleBlockTemplate(block: ethereum.Block): void { + let count = DataSourceCount.load(block.number.toString()); + if (count == null) { + count = new DataSourceCount(block.number.toString()); + count.count = 0; + } + + let ctx = dataSource.context(); + let number = ctx.getBigInt("number"); + assert( + count.count == number.toI32(), + "wrong count, found " + BigInt.fromI32(count.count).toString() + ); + count.count += 1; + count.save(); +} diff --git a/tests/runner-tests/auto-graft-sync/template.yaml b/tests/runner-tests/auto-graft-sync/template.yaml new file mode 100644 index 00000000000..bf5c1b47a06 --- /dev/null +++ b/tests/runner-tests/auto-graft-sync/template.yaml @@ -0,0 +1,66 @@ +specVersion: 0.0.4 +features: + - grafting +schema: + file: ./schema.graphql +graft: + # This value will be overwritten by the templates + base: QmcAL39QSKZvRssr2ToCJrav7XK9ggajxvBR7M1NNUCqdh + block: 3 +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + file: ./src/mapping.ts + # Tests that adding a data source is possible in a graft + - kind: ethereum/contract + name: Contract2 + network: test + source: + address: "0xCfEB869F69431e42cdB54A4F4f105C19C080A601" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + callHandlers: + - handler: handleBlock + function: emitTrigger(uint16) + file: ./src/mapping.ts +templates: + - kind: ethereum/contract + name: Template + network: test + source: + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - Gravatar + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlockTemplate + file: ./src/mapping.ts diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index b20672ce563..6d57a863b4f 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -1,32 +1,654 @@ +use std::fmt::Formatter; use std::marker::PhantomData; +use std::ops::Deref; +use std::pin::Pin; use std::sync::{Arc, Mutex}; +use std::task::Poll; use std::time::Duration; use super::{ test_ptr, CommonChainConfig, MutexBlockStreamBuilder, NoopAdapterSelector, NoopRuntimeAdapterBuilder, StaticBlockRefetcher, StaticStreamBuilder, Stores, TestChain, }; +use async_trait::async_trait; +use futures_core::Stream; +use graph::blockchain::block_stream::BlockStreamEvent; use graph::blockchain::client::ChainClient; -use graph::blockchain::{BlockPtr, TriggersAdapterSelector}; +use graph::blockchain::{ + self, BlockPtr, Blockchain, BlockchainKind, DataSource, DataSourceTemplate, NoopDecoderHook, + RuntimeAdapter, TriggersAdapter, TriggersAdapterSelector, UnresolvedDataSourceTemplate, +}; use graph::cheap_clone::CheapClone; +use graph::components::store::{ + DeploymentCursorTracker, DeploymentId, DeploymentLocator, + SubscriptionManager as SubscriptionManagerTrait, +}; +use graph::data::subgraph::UnifiedMappingApiVersion; +use graph::data_source::{ + DataSource as DataSourceEnum, UnresolvedDataSourceTemplate as UnresolvedDataSourceTemplateEnum, +}; +use graph::data_source::{ + DataSourceTemplate as DataSourceTemplateEnum, TriggerWithHandler, + UnresolvedDataSource as UnresolvedDataSourceEnum, +}; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::{Address, Log, Transaction, H160}; -use graph::prelude::{ethabi, tiny_keccak, LightEthereumBlock, ENV_VARS}; +use graph::prelude::{ + ethabi, tiny_keccak, BlockHash, BlockNumber, ChainStore, CreateSubgraphResult, DeploymentHash, + LightEthereumBlock, LinkResolver, LogCode, NodeId, + SubgraphAssignmentProvider as SubgraphAssignmentProviderTrait, + SubgraphInstanceManager as SubgraphInstanceManagerTrait, SubgraphName, + SubgraphRegistrar as SubgraphRegistrarTrait, SubgraphRegistrarError, + SubgraphStore as SubgraphStoreTrait, ENV_VARS, +}; use graph::{blockchain::block_stream::BlockWithTriggers, prelude::ethabi::ethereum_types::U64}; use graph_chain_ethereum::network::EthereumNetworkAdapters; use graph_chain_ethereum::trigger::LogRef; -use graph_chain_ethereum::Chain; use graph_chain_ethereum::{ chain::BlockFinality, trigger::{EthereumBlockTriggerType, EthereumTrigger}, }; +use graph_chain_ethereum::{Chain, NodeCapabilities}; +use graph_core::{ + create_subgraph_version, SubgraphInstanceManager, SubgraphRegistrar, SubgraphTriggerProcessor, +}; +use serde::{Deserialize, Deserializer}; +use slog::{info, Logger}; + +/// Wrap eth as our test chain +pub struct WrappedEthChain(Arc, pub Mutex>>); + +impl std::fmt::Debug for WrappedEthChain { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("{:?}", self.0)) + } +} + +impl WrappedEthChain { + pub fn new(chain: Arc, stream_builder: Arc>) -> Self { + WrappedEthChain(chain, Mutex::new(stream_builder)) + } + + pub fn inner_chain(&self) -> Arc { + self.0.clone() + } + + pub fn directly_resolve_block_from_number(&self, number: BlockNumber) -> Option { + let block_stream = self.1.lock().unwrap().chain.clone(); + let mut blocks = block_stream.iter().map(|b| b.ptr()); + blocks.find_map(|b| { + if b.number == number { + Some(b.clone()) + } else { + None + } + }) + } +} + +#[derive(Debug, Clone)] +pub struct WrappedDataSource(DataSourceEnum); + +pub struct WrappedUnresolvedDataSource(UnresolvedDataSourceEnum); + +#[derive(Default)] +pub struct WrappedUnresolvedDataSourceTemplate(UnresolvedDataSourceTemplateEnum); + +#[derive(Debug)] +pub struct WrappedDataSourceTemplate(DataSourceTemplateEnum); + +#[derive(Clone, Debug, Default)] +pub struct WrappedTriggerFilter(graph_chain_ethereum::TriggerFilter); + +impl WrappedTriggerFilter { + pub fn inner(&self) -> &graph_chain_ethereum::TriggerFilter { + &self.0 + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct WrappedNodeCapabilities(NodeCapabilities); + +#[derive(Debug)] +pub struct WrappedChainClient(()); + +pub struct WrappedRuntimeAdapter(Arc>); +impl RuntimeAdapter for WrappedRuntimeAdapter { + fn host_fns( + &self, + ds: &::DataSource, + ) -> Result, anyhow::Error> { + RuntimeAdapter::::host_fns( + self.0.deref(), + match &ds.0 { + DataSourceEnum::Onchain(onchain) => onchain, + _ => todo!("only onchain ds are supported"), + }, + ) + } +} + +pub struct WrappedTriggersAdapter(Arc>); + +pub struct WrappedBlockStream(Box>); + +impl Stream for WrappedBlockStream { + type Item = Result< + blockchain::block_stream::BlockStreamEvent, + blockchain::block_stream::BlockStreamError, + >; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + let inner = Pin::new(&mut self.get_mut().0); + match inner.poll_next(cx) { + Poll::Ready(Some(Ok(result))) => { + let mapped = match result { + BlockStreamEvent::Revert(block_ptr, firehose_cursor) => { + BlockStreamEvent::Revert(block_ptr, firehose_cursor) + } + BlockStreamEvent::ProcessBlock(block_with_triggers, firehose_cursor) => { + let BlockWithTriggers { + block, + trigger_data, + } = block_with_triggers; + BlockStreamEvent::ProcessBlock( + BlockWithTriggers { + block, + trigger_data, + }, + firehose_cursor, + ) + } + BlockStreamEvent::ProcessWasmBlock( + block_ptr, + block_time, + wasm, + a_string, + firehose_cursor, + ) => BlockStreamEvent::ProcessWasmBlock( + block_ptr, + block_time, + wasm, + a_string, + firehose_cursor, + ), + }; + Poll::Ready(Some(Ok(mapped))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => std::task::Poll::Pending, + } + } +} + +impl blockchain::block_stream::BlockStream for WrappedBlockStream { + fn buffer_size_hint(&self) -> usize { + self.0.buffer_size_hint() + } +} + +#[async_trait] +impl blockchain::TriggersAdapter for WrappedTriggersAdapter { + async fn ancestor_block( + &self, + ptr: BlockPtr, + offset: BlockNumber, + root: Option, + ) -> Result::Block>, anyhow::Error> { + self.0.ancestor_block(ptr, offset, root).await + } + async fn scan_triggers( + &self, + from: BlockNumber, + to: BlockNumber, + filter: &WrappedTriggerFilter, + ) -> Result<(Vec>, BlockNumber), anyhow::Error> { + let (blocks, num) = self.0.scan_triggers(from, to, &filter.0).await?; + let mut blocks_with_triggers = Vec::new(); + for block in blocks { + let BlockWithTriggers { + block, + trigger_data, + } = block; + blocks_with_triggers.push(BlockWithTriggers { + block, + trigger_data, + }) + } + Ok((blocks_with_triggers, num)) + } + async fn triggers_in_block( + &self, + logger: &Logger, + block: ::Block, + filter: &::TriggerFilter, + ) -> Result, anyhow::Error> { + let inner = self.0.triggers_in_block(logger, block, &filter.0).await?; + let BlockWithTriggers { + block, + trigger_data, + } = inner; + Ok(BlockWithTriggers { + block, + trigger_data, + }) + } + + async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result { + self.0.is_on_main_chain(ptr).await + } + async fn parent_ptr(&self, block: &BlockPtr) -> Result, anyhow::Error> { + self.0.parent_ptr(block).await + } +} + +impl std::fmt::Display for WrappedNodeCapabilities { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str(&format!("WrappedNodeCapabilities({})", self.0)) + } +} + +impl blockchain::NodeCapabilities for WrappedNodeCapabilities { + fn from_data_sources(data_sources: &[::DataSource]) -> Self { + let ds = data_sources + .into_iter() + .map(|source| match source.0.clone() { + DataSourceEnum::Onchain(onchain) => onchain, + DataSourceEnum::Offchain(_) => { + todo!("only onchain data sources are supported") + } + }) + .collect::>(); + let inner = + >::from_data_sources(&ds); + WrappedNodeCapabilities(inner) + } +} + +impl blockchain::TriggerFilter for WrappedTriggerFilter { + fn extend_with_template( + &mut self, + data_source: impl Iterator::DataSourceTemplate>, + ) { + self.0 + .extend_with_template(data_source.map(|filter| match filter.0 { + DataSourceTemplateEnum::Onchain(onchain) => onchain, + DataSourceTemplateEnum::Offchain(_) => { + todo!("only onchain templates are supported") + } + })) + } + + fn extend<'a>( + &mut self, + data_sources: impl Iterator::DataSource> + Clone, + ) { + self.0.extend(data_sources.map(|source| match &source.0 { + DataSourceEnum::Onchain(onchain) => onchain, + DataSourceEnum::Offchain(_) => todo!("only onchain data sources supported"), + })) + } + + fn node_capabilities(&self) -> ::NodeCapabilities { + WrappedNodeCapabilities(self.0.node_capabilities()) + } + + fn to_firehose_filter(self) -> Vec { + self.0.to_firehose_filter() + } +} + +impl<'de> Deserialize<'de> for WrappedUnresolvedDataSource { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let inner = UnresolvedDataSourceEnum::deserialize(deserializer)?; + Ok(WrappedUnresolvedDataSource(inner)) + } +} + +impl<'de> Deserialize<'de> for WrappedUnresolvedDataSourceTemplate { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let inner = UnresolvedDataSourceTemplateEnum::deserialize(deserializer)?; + Ok(WrappedUnresolvedDataSourceTemplate(inner)) + } +} +impl Clone for WrappedDataSourceTemplate { + fn clone(&self) -> Self { + let inner = self.0.clone(); + WrappedDataSourceTemplate(inner) + } +} + +impl Clone for WrappedUnresolvedDataSourceTemplate { + fn clone(&self) -> Self { + let inner = match &self.0 { + UnresolvedDataSourceTemplateEnum::Onchain(template) => { + UnresolvedDataSourceTemplateEnum::Onchain(template.clone()) + } + UnresolvedDataSourceTemplateEnum::Offchain(template) => { + UnresolvedDataSourceTemplateEnum::Offchain(template.clone()) + } + }; + WrappedUnresolvedDataSourceTemplate(inner) + } +} + +impl DataSourceTemplate for WrappedDataSourceTemplate { + fn api_version(&self) -> graph::semver::Version { + self.0.api_version() + } + + fn runtime(&self) -> Option>> { + self.0.runtime() + } + + fn name(&self) -> &str { + self.0.name() + } + + fn manifest_idx(&self) -> u32 { + self.0.manifest_idx() + } + + fn kind(&self) -> &str { + self.0.as_onchain().unwrap().kind() + } +} + +#[async_trait] +impl UnresolvedDataSourceTemplate for WrappedUnresolvedDataSourceTemplate { + async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + let inner = match self.0 { + UnresolvedDataSourceTemplateEnum::Onchain(inner) => inner, + UnresolvedDataSourceTemplateEnum::Offchain(_unresolved_data_source_template) => { + unreachable!() + } + }; + let resolved = inner.resolve(resolver, logger, manifest_idx).await?; + Ok(WrappedDataSourceTemplate(DataSourceTemplateEnum::Onchain( + resolved, + ))) + } +} + +impl DataSource for WrappedDataSource { + fn from_template_info( + info: graph::prelude::InstanceDSTemplateInfo, + template: &DataSourceTemplateEnum, + ) -> Result { + let inner = match template { + DataSourceTemplateEnum::Onchain(inner) => inner, + DataSourceTemplateEnum::Offchain(_) => unreachable!(), + }; + + let inner = match inner.0.clone() { + DataSourceTemplateEnum::Onchain(inner) => inner, + DataSourceTemplateEnum::Offchain(_) => unreachable!(), + }; + + let inner = DataSourceTemplateEnum::Onchain(inner); + let ds = DataSource::from_template_info(info, &inner)?; + Ok(WrappedDataSource(DataSourceEnum::Onchain(ds))) + } + + fn from_stored_dynamic_data_source( + template: &WrappedDataSourceTemplate, + stored: graph::components::store::StoredDynamicDataSource, + ) -> Result { + let inner = match template.0.clone() { + DataSourceTemplateEnum::Onchain(inner) => inner, + DataSourceTemplateEnum::Offchain(_data_source_template) => todo!(), + }; + //let inner = DataSourceTemplateEnum::Onchain(inner); + let ds = DataSource::::from_stored_dynamic_data_source(&inner, stored)?; + Ok(WrappedDataSource(DataSourceEnum::Onchain(ds))) + } + + fn address(&self) -> Option<&[u8]> { + self.0.as_onchain()?.address() + } + + fn start_block(&self) -> graph::prelude::BlockNumber { + self.0.as_onchain().unwrap().start_block() + } + + fn end_block(&self) -> Option { + self.0.as_onchain()?.end_block() + } + + fn name(&self) -> &str { + self.0.name() + } + + fn kind(&self) -> &str { + self.0.as_onchain().unwrap().kind() + } + + fn network(&self) -> Option<&str> { + self.0.as_onchain()?.network() + } + + fn context(&self) -> Arc> { + self.0.context() + } + + fn creation_block(&self) -> Option { + self.0.creation_block() + } + + fn api_version(&self) -> graph::semver::Version { + self.0.api_version() + } + + fn runtime(&self) -> Option>> { + self.0.runtime() + } + + fn handler_kinds(&self) -> std::collections::HashSet<&str> { + self.0.handler_kinds() + } + + fn match_and_decode( + &self, + trigger: &::TriggerData, + block: &Arc<::Block>, + logger: &slog::Logger, + ) -> Result::MappingTrigger>>, anyhow::Error> + { + self.0 + .as_onchain() + .unwrap() + .match_and_decode(trigger, block, logger) + } + + fn is_duplicate_of(&self, other: &Self) -> bool { + self.0.is_duplicate_of(&other.0) + } + + fn as_stored_dynamic_data_source(&self) -> graph::components::store::StoredDynamicDataSource { + self.0.as_stored_dynamic_data_source() + } + + fn validate(&self, spec_version: &graph::semver::Version) -> Vec { + self.0.validate(spec_version) + } +} + +#[async_trait] +impl blockchain::UnresolvedDataSource for WrappedUnresolvedDataSource { + async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + println!("UnresolvedDataSource::resolve"); + let inner = self.0; + let inner = inner.resolve(resolver, logger, manifest_idx).await?; + Ok(WrappedDataSource(inner)) + } +} + +#[async_trait] +impl Blockchain for WrappedEthChain { + const KIND: BlockchainKind = Chain::KIND; + const ALIASES: &'static [&'static str] = Chain::ALIASES; + + type Client = Arc>; + + type Block = ::Block; + + type DataSource = WrappedDataSource; + type DataSourceTemplate = WrappedDataSourceTemplate; + + type UnresolvedDataSource = WrappedUnresolvedDataSource; + type UnresolvedDataSourceTemplate = WrappedUnresolvedDataSourceTemplate; + type TriggerData = ::TriggerData; + type MappingTrigger = ::MappingTrigger; + type TriggerFilter = WrappedTriggerFilter; + type NodeCapabilities = WrappedNodeCapabilities; + type DecoderHook = NoopDecoderHook; + + fn triggers_adapter( + &self, + log: &DeploymentLocator, + capabilities: &Self::NodeCapabilities, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, anyhow::Error> { + let inner = self + .0 + .triggers_adapter(log, &capabilities.0, unified_api_version)?; + let wrapped = Arc::new(WrappedTriggersAdapter(inner)); + Ok(wrapped) + } + + fn chain_store(&self) -> Arc { + self.0.chain_store() + } + + fn is_refetch_block_required(&self) -> bool { + self.0.is_refetch_block_required() + } + + fn runtime(&self) -> anyhow::Result<(Arc>, Self::DecoderHook)> { + let (inner, _decoderhook) = self.0.runtime()?; + let wrapped = WrappedRuntimeAdapter(inner); + Ok((Arc::new(wrapped), NoopDecoderHook)) + } + + fn chain_client(&self) -> Arc> { + let client = self.0.chain_client(); + let wrapped = match client.deref() { + ChainClient::Firehose(firehose_endpoints) => { + ChainClient::new_firehose(firehose_endpoints.clone()) + } + _ => todo!("only firehose is supported"), + }; + let wrapped = Arc::new(wrapped); + wrapped + } + + async fn block_pointer_from_number( + &self, + logger: &Logger, + number: BlockNumber, + ) -> Result { + info!(&logger, "block_pointer_from_number - WrappedEthChain is directly resolving blocks from the BlockStreamBuilder"; "number" => number); + if let Some(block_ptr) = self.directly_resolve_block_from_number(number) { + return Ok(block_ptr); + } + self.0.block_pointer_from_number(logger, number).await + } + + async fn new_block_stream( + &self, + deployment: DeploymentLocator, + store: impl DeploymentCursorTracker, + start_blocks: Vec, + filter: Arc, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>, anyhow::Error> { + let inner = self + .0 + .new_block_stream( + deployment, + store, + start_blocks, + Arc::new(filter.0.clone()), + unified_api_version, + ) + .await?; + let wrapped = Box::new(WrappedBlockStream(inner)); + Ok(wrapped) + } + + async fn refetch_firehose_block( + &self, + logger: &Logger, + cursor: blockchain::block_stream::FirehoseCursor, + ) -> Result { + self.0.refetch_firehose_block(logger, cursor).await + } + + async fn block_ingestor(&self) -> anyhow::Result> { + self.0.block_ingestor().await + } +} pub async fn chain( test_name: &str, blocks: Vec>, stores: &Stores, triggers_adapter: Option>>, -) -> TestChain { +) -> TestChain { + let (_, block_stream_builder, chain) = + create_chain(triggers_adapter, test_name, stores, blocks).await; + + TestChain { + chain: Arc::new(chain), + block_stream_builder, + } +} + +pub async fn wrapped_chain( + test_name: &str, + blocks: Vec>, + stores: &Stores, + triggers_adapter: Option>>, +) -> TestChain { + let (static_block_stream, block_stream_builder, chain) = + create_chain(triggers_adapter, test_name, stores, blocks).await; + + TestChain { + chain: Arc::new(WrappedEthChain::new(Arc::new(chain), static_block_stream)), + block_stream_builder, + } +} + +async fn create_chain( + triggers_adapter: Option>>, + test_name: &str, + stores: &Stores, + blocks: Vec>, +) -> ( + Arc>, + Arc>, + Chain, +) { let triggers_adapter = triggers_adapter.unwrap_or(Arc::new(NoopAdapterSelector { triggers_in_block_sleep: Duration::ZERO, x: PhantomData, @@ -43,7 +665,9 @@ pub async fn chain( let client = Arc::new(ChainClient::::new_firehose(firehose_endpoints)); let static_block_stream = Arc::new(StaticStreamBuilder { chain: blocks }); - let block_stream_builder = Arc::new(MutexBlockStreamBuilder(Mutex::new(static_block_stream))); + let block_stream_builder = Arc::new(MutexBlockStreamBuilder(Mutex::new( + static_block_stream.clone(), + ))); let eth_adapters = Arc::new(EthereumNetworkAdapters::empty_for_testing()); @@ -66,11 +690,7 @@ pub async fn chain( // We assume the tested chain is always ingestible for now true, ); - - TestChain { - chain: Arc::new(chain), - block_stream_builder, - } + (static_block_stream, block_stream_builder, chain) } pub fn genesis() -> BlockWithTriggers { @@ -184,3 +804,153 @@ pub fn push_test_polling_trigger(block: &mut BlockWithTriggers) { EthereumBlockTriggerType::End, )) } + +#[derive(Clone)] +pub struct WrappedSubgraphInstanceManager { + pub inner: Arc>, +} + +#[async_trait] +impl SubgraphInstanceManagerTrait for WrappedSubgraphInstanceManager { + async fn start_subgraph( + self: Arc, + deployment: DeploymentLocator, + manifest: serde_yaml::Mapping, + stop_block: Option, + ) { + let inner = self.inner.cheap_clone(); + let logger = inner.subgraph_logger(&deployment); + graph::prelude::info!(logger, "WrappedSubgraphInstanceManager::start_subgraph"); + let err_logger = inner.subgraph_logger(&deployment); + + let subgraph_start_future = async move { + let runner = inner + .build_subgraph_runner::( + logger.clone(), + inner.get_env_vars(), + deployment.clone(), + manifest, + stop_block, + Box::new(SubgraphTriggerProcessor {}), + ) + .await?; + inner + .start_subgraph_thread(logger, deployment, runner) + .await + }; + graph::spawn(async move { + match subgraph_start_future.await { + Ok(()) => {} + Err(err) => graph::prelude::error!( + err_logger, + "Failed to start subgraph"; + "error" => format!("{:#}", err), + "code" => LogCode::SubgraphStartFailure + ), + } + }); + } + + async fn stop_subgraph(&self, deployment: DeploymentLocator) { + let inner = self.inner.cheap_clone(); + inner.stop_subgraph(deployment).await + } +} + +pub struct WrappedSubgraphRegistrar(pub SubgraphRegistrar); + +#[async_trait] +impl SubgraphRegistrarTrait for WrappedSubgraphRegistrar +where + P: SubgraphAssignmentProviderTrait, + S: SubgraphStoreTrait, + SM: SubscriptionManagerTrait, +{ + async fn create_subgraph( + &self, + name: SubgraphName, + ) -> Result { + SubgraphRegistrarTrait::create_subgraph(&self.0, name.clone()).await + } + + async fn create_subgraph_version( + &self, + name: SubgraphName, + hash: DeploymentHash, + node_id: NodeId, + debug_fork: Option, + start_block_override: Option, + graft_block_override: Option, + history_blocks: Option, + ) -> Result { + // We don't have a location for the subgraph yet; that will be + // assigned when we deploy for real. For logging purposes, make up a + // fake locator + let logger = self + .0 + .subgraph_logger(&DeploymentLocator::new(DeploymentId(0), hash.clone())); + + let raw: serde_yaml::Mapping = self.0.resolve_raw_manifest(&hash).await?; + + // Give priority to deployment specific history_blocks value. + let history_blocks = history_blocks.or(self + .0 + .get_settings() + .for_name(&name) + .map(|c| c.history_blocks)); + + let auto_graft_sync_depth = if self.0.store.auto_graft_sync() { + Some(0) + } else { + None + }; + + let deployment_locator = create_subgraph_version::( + &logger, + self.0.store.clone(), + self.0.chains.cheap_clone(), + name.clone(), + hash.cheap_clone(), + start_block_override, + graft_block_override, + raw, + node_id, + debug_fork, + self.0.version_switching_mode, + &self.0.resolver, + history_blocks, + auto_graft_sync_depth, + self.0.provider.clone(), + ) + .await?; + + graph::prelude::debug!( + &logger, + "Wrote new subgraph version to store"; + "subgraph_name" => name.to_string(), + "subgraph_hash" => hash.to_string(), + ); + + Ok(deployment_locator) + } + + async fn remove_subgraph(&self, name: SubgraphName) -> Result<(), SubgraphRegistrarError> { + SubgraphRegistrarTrait::remove_subgraph(&self.0, name).await + } + + async fn reassign_subgraph( + &self, + hash: &DeploymentHash, + node_id: &NodeId, + ) -> Result<(), SubgraphRegistrarError> { + SubgraphRegistrarTrait::reassign_subgraph(&self.0, hash, node_id).await + } + + async fn pause_subgraph(&self, hash: &DeploymentHash) -> Result<(), SubgraphRegistrarError> { + SubgraphRegistrarTrait::pause_subgraph(&self.0, hash).await + } + + async fn resume_subgraph(&self, hash: &DeploymentHash) -> Result<(), SubgraphRegistrarError> { + SubgraphRegistrarTrait::resume_subgraph(&self.0, hash).await + } +} diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 4eb5fbb42b1..04d3d12d9a4 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use anyhow::Error; use async_stream::stream; +use ethereum::{WrappedEthChain, WrappedSubgraphInstanceManager, WrappedSubgraphRegistrar}; use graph::blockchain::block_stream::{ BlockRefetcher, BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, @@ -38,8 +39,10 @@ use graph::prelude::serde_json::{self, json}; use graph::prelude::{ async_trait, lazy_static, q, r, ApiVersion, BigInt, BlockNumber, DeploymentHash, GraphQlRunner as _, IpfsResolver, LoggerFactory, NodeId, QueryError, - SubgraphAssignmentProvider, SubgraphCountMetric, SubgraphName, SubgraphRegistrar, - SubgraphStore as _, SubgraphVersionSwitchingMode, TriggerProcessor, + SubgraphAssignmentProvider, SubgraphCountMetric, + SubgraphInstanceManager as SubgraphInstanceManagerTrait, SubgraphName, + SubgraphRegistrar as SubgraphRegistrarTrait, SubgraphStore as _, SubgraphVersionSwitchingMode, + TriggerProcessor, }; use graph::schema::InputSchema; use graph_chain_ethereum::chain::RuntimeAdapterBuilder; @@ -119,9 +122,9 @@ impl CommonChainConfig { } } -pub struct TestChain { +pub struct TestChain { pub chain: Arc, - pub block_stream_builder: Arc>, + pub block_stream_builder: Arc>, } impl TestChainTrait for TestChain { @@ -135,6 +138,18 @@ impl TestChainTrait for TestChain { } } +impl TestChainTrait for TestChain { + fn set_block_stream(&self, blocks: Vec>) { + let static_block_stream = Arc::new(StaticStreamBuilder { chain: blocks }); + *self.chain.1.lock().unwrap() = static_block_stream.clone(); + *self.block_stream_builder.0.lock().unwrap() = static_block_stream; + } + + fn chain(&self) -> Arc { + Arc::clone(&self.chain) + } +} + pub struct TestChainSubstreams { pub chain: Arc, pub block_stream_builder: Arc, @@ -148,23 +163,18 @@ impl TestChainTrait for TestChainSubstreams { } } -pub trait TestChainTrait { - fn set_block_stream(&self, blocks: Vec>); - +pub trait TestChainTrait { fn chain(&self) -> Arc; + fn set_block_stream(&self, blocks: Vec>); } -pub struct TestContext { +pub struct TestContext> { pub logger: Logger, - pub provider: Arc< - IpfsSubgraphAssignmentProvider< - SubgraphInstanceManager, - >, - >, + pub provider: Arc>, pub store: Arc, pub deployment: DeploymentLocator, pub subgraph_name: SubgraphName, - pub instance_manager: SubgraphInstanceManager, + pub instance_manager: S, pub link_resolver: Arc, pub arweave_resolver: Arc, pub env_vars: Arc, @@ -198,14 +208,11 @@ pub struct IndexingStatusForCurrentVersion { pub indexing_status_for_current_version: IndexingStatus, } -impl TestContext { +impl TestContext> { pub async fn runner( &self, stop_block: BlockPtr, - ) -> graph_core::SubgraphRunner< - graph_chain_ethereum::Chain, - RuntimeHostBuilder, - > { + ) -> graph_core::SubgraphRunner> { let (logger, deployment, raw) = self.get_runner_context().await; let tp: Box> = Box::new(SubgraphTriggerProcessor {}); @@ -246,7 +253,9 @@ impl TestContext { .await .unwrap() } +} +impl TestContext { pub async fn get_runner_context(&self) -> (Logger, DeploymentLocator, serde_yaml::Mapping) { let logger = self.logger.cheap_clone(); let deployment = self.deployment.cheap_clone(); @@ -351,7 +360,7 @@ impl TestContext { } } -impl Drop for TestContext { +impl Drop for TestContext { fn drop(&mut self) { if let Err(e) = cleanup(&self.store, &self.subgraph_name, &self.deployment.hash) { crit!(self.logger, "error cleaning up test subgraph"; "error" => e.to_string()); @@ -435,13 +444,69 @@ pub struct TestInfo { pub hash: DeploymentHash, } +pub async fn setup_wrapped( + test_info: &TestInfo, + stores: &Stores, + chain: &impl TestChainTrait, + graft_block: Option, + env_vars: Option, +) -> TestContext> { + setup_internal( + test_info, + stores, + chain, + graft_block, + env_vars, + |subgraph_instance_manager| WrappedSubgraphInstanceManager { + inner: Arc::new(subgraph_instance_manager), + }, + |registrar| Arc::new(WrappedSubgraphRegistrar(registrar)), + ) + .await +} + +// preserving the old setup pub async fn setup( test_info: &TestInfo, stores: &Stores, chain: &impl TestChainTrait, graft_block: Option, env_vars: Option, -) -> TestContext { +) -> TestContext> { + setup_internal( + test_info, + stores, + chain, + graft_block, + env_vars, + |subgraph_instance_manager| subgraph_instance_manager, + |registrar| Arc::new(registrar), + ) + .await +} + +async fn setup_internal( + test_info: &TestInfo, + stores: &Stores, + chain: &impl TestChainTrait, + graft_block: Option, + env_vars: Option, + wrap_instance_manager: I, + wrap_subgraph_registrar: R, +) -> TestContext +where + C: Blockchain, + C2: Blockchain, + I: FnOnce(SubgraphInstanceManager) -> M, + M: SubgraphInstanceManagerTrait + Clone, + R: FnOnce( + graph_core::SubgraphRegistrar< + graph_core::SubgraphAssignmentProvider, + graph_store_postgres::SubgraphStore, + PanicSubscriptionManager, + >, + ) -> Arc, +{ let env_vars = Arc::new(match env_vars { Some(ev) => ev, None => EnvVars::from_env().unwrap(), @@ -458,6 +523,7 @@ pub async fn setup( let mut blockchain_map = BlockchainMap::new(); blockchain_map.insert(stores.network_name.clone(), chain.chain()); + let blockchain_map = Arc::new(blockchain_map); let static_filters = env_vars.experimental_static_filters; @@ -492,7 +558,6 @@ pub async fn setup( ); let sg_count = Arc::new(SubgraphCountMetric::new(mock_registry.cheap_clone())); - let blockchain_map = Arc::new(blockchain_map); let subgraph_instance_manager = SubgraphInstanceManager::new( &logger_factory, env_vars.cheap_clone(), @@ -506,6 +571,8 @@ pub async fn setup( static_filters, ); + let wrapped_subgraph_instance_manager = wrap_instance_manager(subgraph_instance_manager); + // Graphql runner let subscription_manager = Arc::new(PanicSubscriptionManager {}); let load_manager = LoadManager::new(&logger, Vec::new(), Vec::new(), mock_registry.clone()); @@ -528,13 +595,13 @@ pub async fn setup( let subgraph_provider = Arc::new(IpfsSubgraphAssignmentProvider::new( &logger_factory, link_resolver.cheap_clone(), - subgraph_instance_manager.clone(), + wrapped_subgraph_instance_manager.clone(), sg_count, )); let panicking_subscription_manager = Arc::new(PanicSubscriptionManager {}); - let subgraph_registrar = Arc::new(IpfsSubgraphRegistrar::new( + let subgraph_registrar = IpfsSubgraphRegistrar::new( &logger_factory, link_resolver.cheap_clone(), subgraph_provider.clone(), @@ -544,17 +611,19 @@ pub async fn setup( node_id.clone(), SubgraphVersionSwitchingMode::Instant, Arc::new(Settings::default()), - )); + ); + + let wrapped_subgraph_registrar = wrap_subgraph_registrar(subgraph_registrar); - SubgraphRegistrar::create_subgraph( - subgraph_registrar.as_ref(), + SubgraphRegistrarTrait::create_subgraph( + wrapped_subgraph_registrar.as_ref(), test_info.subgraph_name.clone(), ) .await .expect("unable to create subgraph"); - let deployment = SubgraphRegistrar::create_subgraph_version( - subgraph_registrar.as_ref(), + let deployment = SubgraphRegistrarTrait::create_subgraph_version( + wrapped_subgraph_registrar.as_ref(), test_info.subgraph_name.clone(), test_info.hash.clone(), node_id.clone(), @@ -575,7 +644,7 @@ pub async fn setup( deployment, subgraph_name: test_info.subgraph_name.clone(), graphql_runner, - instance_manager: subgraph_instance_manager, + instance_manager: wrapped_subgraph_instance_manager, link_resolver, env_vars, indexing_status_service, @@ -651,7 +720,7 @@ pub async fn wait_for_sync( return Err(fatal_error); } - if block_ptr == stop_block { + if block_ptr.number >= stop_block.number { info!(logger, "TEST: reached stop block"); return Ok(()); } @@ -741,7 +810,7 @@ impl BlockStreamBuilder for MutexBlockStreamBuilder { /// /// If the stream is reset, emitted reorged blocks will not be emitted again. /// See also: static-stream-builder -struct StaticStreamBuilder { +pub struct StaticStreamBuilder { chain: Vec>, } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index caeb67e9adf..12eef2c256f 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -23,7 +23,7 @@ use graph::prelude::{ }; use graph_tests::fixture::ethereum::{ chain, empty_block, generate_empty_blocks_for_range, genesis, push_test_command, push_test_log, - push_test_polling_trigger, + push_test_polling_trigger, wrapped_chain, }; use graph_tests::fixture::substreams::chain as substreams_chain; @@ -144,10 +144,10 @@ async fn data_source_revert() -> anyhow::Result<()> { // Test grafted version let subgraph_name = SubgraphName::new("data-source-revert-grafted").unwrap(); - let hash = build_subgraph_with_yarn_cmd_and_arg( + let hash = build_subgraph_with_yarn_cmd_and_args( "./runner-tests/data-source-revert", "deploy:test-grafted", - Some(&test_info.hash), + vec![&test_info.hash], ) .await; let test_info = TestInfo { @@ -1262,6 +1262,121 @@ async fn arweave_file_data_sources() { ); } +#[tokio::test] +async fn auto_graft_sync() -> anyhow::Result<()> { + let stores = fixture::stores( + "auto_graft_sync", + "./runner-tests/auto-graft-sync/config.toml", + ) + .await; + + assert_eq!( + stores.network_store.subgraph_store().auto_graft_sync(), + true, + "Auto graft sync should be enabled" + ); + + let test_name = "auto_graft_sync"; + + let blocks = { + let block0 = genesis(); + let mut last_block = block0.ptr(); + let mut blocks = vec![block0]; + for i in 1..=20 { + let block = empty_block(last_block, test_ptr(i)); + last_block = block.ptr(); + blocks.push(block); + } + blocks + }; + + let chain = wrapped_chain(&test_name, blocks.clone(), &stores, None).await; + + // Root graft + // only build the subgraph and create it, but don't deploy it to be assigned + let root_deployment_hash = build_subgraph_with_yarn_cmd_and_args( + "./runner-tests/auto-graft-sync", + "build-graft-root", + vec![], + ) + .await; + + let mut prev_hash = root_deployment_hash.clone(); + + let subgraph_name = SubgraphName::new("auto-graft-sync-tip".to_string()).unwrap(); + let root_deployment = TestInfo { + test_name: test_name.to_string(), + hash: root_deployment_hash.clone(), + test_dir: "./runner-tests/auto-graft-sync".to_string(), + subgraph_name, + }; + + // our "root deployment" does not have a graft, so this should complete. + let base_ctx = fixture::setup_wrapped(&root_deployment, &stores, &chain, None, None).await; + base_ctx.start_and_sync_to(test_ptr(1)).await; + + let block_ptr = stores + .network_store + .subgraph_store() + .least_block_ptr(&root_deployment.hash) + .await + .unwrap() + .unwrap(); + + base_ctx + .start_and_sync_to(test_ptr(block_ptr.number + 1)) + .await; + + for i in 0..3 { + let outfile = format!("auto-graft-sync-{}.yaml", i); + // Test grafted version + let hash = build_subgraph_with_yarn_cmd_and_args( + "./runner-tests/auto-graft-sync", + "build:test-auto-graft-sync", + vec![&outfile, &prev_hash, &(block_ptr.number + 1).to_string()], + ) + .await; + prev_hash = hash; + } + + let top_graft_info = TestInfo { + test_name: test_name.to_string(), + hash: prev_hash.clone(), + test_dir: "./runner-tests/auto-graft-sync".to_string(), + subgraph_name: SubgraphName::new("auto-graft-sync-tip".to_string()).unwrap(), + }; + + let ctx = fixture::setup_wrapped(&top_graft_info, &stores, &chain, None, None).await; + ctx.start_and_sync_to(test_ptr(block_ptr.number + 1)).await; + + tokio::time::sleep(Duration::from_secs(3)).await; + + for i in 0..10 { + let query_res = ctx + .query(&format!( + "{{ dataSourceCount(id: \"{}\") {{ id, count }} }}", + i + )) + .await + .unwrap(); + println!("query_res: {:?}", query_res); + } + + // TODO: The semantically correct value for `count` would be 5. But because the test fixture + // uses a `NoopTriggersAdapter` the data sources are not reprocessed in the block in which they + // are created. + let query_res = base_ctx + .query(r#"{ dataSourceCount(id: "3") { id, count } }"#) + .await + .unwrap(); + assert_eq!( + query_res, + Some(object! { dataSourceCount: object!{ id: "3", count: 3 } }) + ); + + Ok(()) +} + /// deploy_cmd is the command to run to deploy the subgraph. If it is None, the /// default `yarn deploy:test` is used. async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { @@ -1269,13 +1384,13 @@ async fn build_subgraph(dir: &str, deploy_cmd: Option<&str>) -> DeploymentHash { } async fn build_subgraph_with_yarn_cmd(dir: &str, yarn_cmd: &str) -> DeploymentHash { - build_subgraph_with_yarn_cmd_and_arg(dir, yarn_cmd, None).await + build_subgraph_with_yarn_cmd_and_args(dir, yarn_cmd, vec![]).await } -async fn build_subgraph_with_yarn_cmd_and_arg( +async fn build_subgraph_with_yarn_cmd_and_args( dir: &str, yarn_cmd: &str, - arg: Option<&str>, + mut additional_args: Vec<&str>, ) -> DeploymentHash { // Test that IPFS is up. ipfs::IpfsRpcClient::new(ipfs::ServerAddress::local_rpc_api(), &graph::log::discard()) @@ -1296,7 +1411,7 @@ async fn build_subgraph_with_yarn_cmd_and_arg( run_cmd(Command::new("yarn").arg("codegen").current_dir(dir)); let mut args = vec![yarn_cmd]; - args.extend(arg); + args.append(&mut additional_args); // Run `deploy` for the side effect of uploading to IPFS, the graph node url // is fake and the actual deploy call is meant to fail.