diff --git a/Cargo.lock b/Cargo.lock index 4515f57f7b9..86e11298c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -540,9 +540,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" [[package]] name = "bitvec" @@ -1235,7 +1235,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04001f23ba8843dc315804fa324000376084dfb1c30794ff68dd279e6e5696d5" dependencies = [ "bigdecimal 0.3.1", - "bitflags 2.6.0", + "bitflags 2.9.0", "byteorder", "chrono", "diesel_derives", @@ -1562,6 +1562,18 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" +[[package]] +name = "filetime" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35c0522e981e68cbfa8c3f978441a5f34b30b96e146b33cd3359176b50fe8586" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "libredox", + "windows-sys 0.59.0", +] + [[package]] name = "firestorm" version = "0.4.6" @@ -1638,6 +1650,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -1761,7 +1782,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27d12c0aed7f1e24276a241aadc4cb8ea9f83000f34bc062b7cc2d51e3b0fabd" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "debugid", "fxhash", "serde", @@ -1842,9 +1863,9 @@ dependencies = [ [[package]] name = "globset" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57da3b9b5b85bd66f31093f8c408b90a74431672542466497dcbdfdc02034be1" +checksum = "54a1028dfc5f5df5da8a56a73e6c153c9a9708ec57232470703592a3f18e49f5" dependencies = [ "aho-corasick", "bstr", @@ -2062,6 +2083,7 @@ dependencies = [ "diesel", "env_logger", "git-testament", + "globset", "graph", "graph-chain-arweave", "graph-chain-ethereum", @@ -2079,6 +2101,8 @@ dependencies = [ "itertools 0.13.0", "json-structural-diff", "lazy_static", + "notify", + "pgtemp", "prometheus", "serde", "shellexpand", @@ -2929,6 +2953,26 @@ dependencies = [ "serde", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.0", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -3134,6 +3178,26 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -3158,8 +3222,9 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "libc", + "redox_syscall 0.5.2", ] [[package]] @@ -3322,6 +3387,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", + "log", "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -3393,6 +3459,31 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c96aba5aa877601bb3f6dd6a63a969e1f82e60646e81e71b14496995e9853c91" +[[package]] +name = "notify" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fee8403b3d66ac7b26aee6e40a897d85dc5ce26f44da36b8b73e987cc52e943" +dependencies = [ + "bitflags 2.9.0", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.59.0", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "num-bigint" version = "0.2.6" @@ -3518,7 +3609,7 @@ version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "cfg-if 1.0.0", "foreign-types", "libc", @@ -3705,6 +3796,17 @@ dependencies = [ "serde", ] +[[package]] +name = "pgtemp" +version = "0.6.0" +source = "git+https://github.com/incrypto32/pgtemp?branch=initdb-args#929a9f96eab841d880c2ebf280e00054ca55ec0e" +dependencies = [ + "libc", + "tempfile", + "tokio", + "url", +] + [[package]] name = "phf" version = "0.11.2" @@ -4213,7 +4315,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", ] [[package]] @@ -4385,7 +4487,7 @@ version = "0.38.34" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "errno", "libc", "linux-raw-sys", @@ -4528,7 +4630,7 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -4541,7 +4643,7 @@ version = "3.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.10.0", "core-foundation-sys", "libc", @@ -5156,7 +5258,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -5700,7 +5802,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "bytes", "http 1.1.0", "http-body 1.0.0", @@ -6896,7 +6998,7 @@ version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", ] [[package]] diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 063ca4aa010..9f066bb62f6 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -108,6 +108,8 @@ pub trait SubgraphStore: Send + Sync + 'static { node_id: &NodeId, ) -> Result<(), StoreError>; + fn unassign_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; + fn pause_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; fn resume_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>; diff --git a/node/Cargo.toml b/node/Cargo.toml index 4e2f4ddbbfb..cb3039c4a34 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -12,6 +12,10 @@ path = "src/main.rs" name = "graphman" path = "src/bin/manager.rs" +[[bin]] +name = "gnd" +path = "src/bin/dev.rs" + [dependencies] anyhow = { workspace = true } env_logger = "0.11.3" @@ -40,3 +44,6 @@ termcolor = "1.4.1" diesel = { workspace = true } prometheus = { version = "0.13.4", features = ["push"] } json-structural-diff = { version = "0.2", features = ["colorize"] } +pgtemp = { git = "https://github.com/incrypto32/pgtemp", branch = "initdb-args" } +globset = "0.4.16" +notify = "8.0.0" diff --git a/node/src/bin/dev.rs b/node/src/bin/dev.rs new file mode 100644 index 00000000000..1545c4bad4c --- /dev/null +++ b/node/src/bin/dev.rs @@ -0,0 +1,169 @@ +use std::{path::Path, sync::Arc}; + +use anyhow::{Context, Result}; +use clap::Parser; +use git_testament::{git_testament, render_testament}; +use graph::{ + components::link_resolver::FileLinkResolver, + env::EnvVars, + log::logger, + tokio::{self, sync::mpsc}, +}; +use graph_node::{ + dev::{helpers::DevModeContext, watcher::watch_subgraph_dir}, + launcher, + opt::Opt, +}; +use lazy_static::lazy_static; +use pgtemp::PgTempDBBuilder; + +git_testament!(TESTAMENT); +lazy_static! { + static ref RENDERED_TESTAMENT: String = render_testament!(TESTAMENT); +} + +#[derive(Clone, Debug, Parser)] +#[clap( + name = "gnd", + about = "Graph Node Dev", + author = "Graph Protocol, Inc.", + version = RENDERED_TESTAMENT.as_str() +)] +pub struct DevOpt { + #[clap( + long, + help = "Start a graph-node in dev mode watching a build directory for changes" + )] + pub watch: bool, + + #[clap( + long, + help = "The location of the subgraph manifest file.", + default_value = "./build/subgraph.yaml" + )] + pub manifest: String, + + #[clap( + long, + allow_negative_numbers = false, + value_name = "NETWORK_NAME:[CAPABILITIES]:URL", + env = "ETHEREUM_RPC", + help = "Ethereum network name (e.g. 'mainnet'), optional comma-seperated capabilities (eg 'full,archive'), and an Ethereum RPC URL, separated by a ':'" + )] + pub ethereum_rpc: Vec, + + #[clap( + long, + value_name = "HOST:PORT", + env = "IPFS", + help = "HTTP addresses of IPFS servers (RPC, Gateway)", + default_value = "https://api.thegraph.com/ipfs" + )] + pub ipfs: Vec, +} + +/// Builds the Graph Node options from DevOpt +fn build_args(dev_opt: &DevOpt, db_url: &str, manifest_path: &str) -> Result { + let mut args = vec!["gnd".to_string()]; + + if !dev_opt.ipfs.is_empty() { + args.push("--ipfs".to_string()); + args.push(dev_opt.ipfs.join(",")); + } + + if !dev_opt.ethereum_rpc.is_empty() { + args.push("--ethereum-rpc".to_string()); + args.push(dev_opt.ethereum_rpc.join(",")); + } + + let path = Path::new(manifest_path); + let file_name = path + .file_name() + .context("Invalid manifest path: no file name component")? + .to_str() + .context("Invalid file name")?; + + args.push("--subgraph".to_string()); + args.push(file_name.to_string()); + + args.push("--postgres-url".to_string()); + args.push(db_url.to_string()); + + let opt = Opt::parse_from(args); + + Ok(opt) +} + +/// Validates the manifest file exists and returns the build directory +fn get_build_dir(manifest_path_str: &str) -> Result { + let manifest_path = Path::new(manifest_path_str); + + if !manifest_path.exists() { + anyhow::bail!("Subgraph manifest file not found at {}", manifest_path_str); + } + + let dir = manifest_path + .parent() + .context("Failed to get parent directory of manifest")?; + + dir.canonicalize() + .context("Failed to canonicalize build directory path") +} + +async fn run_graph_node(opt: Opt, ctx: Option) -> Result<()> { + let env_vars = Arc::new(EnvVars::from_env().context("Failed to load environment variables")?); + + launcher::run(opt, env_vars, ctx).await; + Ok(()) +} + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + let dev_opt = DevOpt::parse(); + + let build_dir = get_build_dir(&dev_opt.manifest)?; + + let db = PgTempDBBuilder::new() + .with_data_dir_prefix(build_dir.clone()) + .with_initdb_param("-E", "UTF8") + .with_initdb_param("--locale", "C") + .start_async() + .await; + + let (tx, rx) = mpsc::channel(1); + let opt = build_args(&dev_opt, &db.connection_uri(), &dev_opt.manifest)?; + let file_link_resolver = Arc::new(FileLinkResolver::with_base_dir(&build_dir)); + + let ctx = DevModeContext { + watch: dev_opt.watch, + file_link_resolver, + updates_rx: rx, + }; + + let subgraph = opt.subgraph.clone().unwrap(); + + // Set up logger + let logger = logger(opt.debug); + + // Run graph node + graph::spawn(async move { + let _ = run_graph_node(opt, Some(ctx)).await; + }); + + if dev_opt.watch { + graph::spawn_blocking(async move { + watch_subgraph_dir( + &logger, + build_dir, + subgraph, + vec!["pgtemp-*".to_string()], + tx, + ) + .await; + }); + } + + graph::futures03::future::pending::<()>().await; + Ok(()) +} diff --git a/node/src/dev/helpers.rs b/node/src/dev/helpers.rs new file mode 100644 index 00000000000..45f7af9b75e --- /dev/null +++ b/node/src/dev/helpers.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph::components::link_resolver::FileLinkResolver; +use graph::prelude::{ + BlockPtr, DeploymentHash, NodeId, SubgraphRegistrarError, SubgraphStore as SubgraphStoreTrait, +}; +use graph::slog::{error, info, Logger}; +use graph::tokio::sync::mpsc::Receiver; +use graph::{ + components::store::DeploymentLocator, + prelude::{SubgraphName, SubgraphRegistrar}, +}; +use graph_store_postgres::SubgraphStore; + +pub struct DevModeContext { + pub watch: bool, + pub file_link_resolver: Arc, + pub updates_rx: Receiver<(DeploymentHash, SubgraphName)>, +} + +/// Cleanup a subgraph +/// This is used to remove a subgraph before redeploying it when using the watch flag +fn cleanup_dev_subgraph( + logger: &Logger, + subgraph_store: &SubgraphStore, + name: &SubgraphName, + locator: &DeploymentLocator, +) -> Result<()> { + info!(logger, "Removing subgraph"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); + subgraph_store.remove_subgraph(name.clone())?; + subgraph_store.unassign_subgraph(locator)?; + subgraph_store.remove_deployment(locator.id.into())?; + info!(logger, "Subgraph removed"; "name" => name.to_string(), "id" => locator.id.to_string(), "hash" => locator.hash.to_string()); + Ok(()) +} + +async fn deploy_subgraph( + logger: &Logger, + subgraph_registrar: Arc, + name: SubgraphName, + subgraph_id: DeploymentHash, + node_id: NodeId, + debug_fork: Option, + start_block: Option, +) -> Result { + info!(logger, "Re-deploying subgraph"; "name" => name.to_string(), "id" => subgraph_id.to_string()); + subgraph_registrar.create_subgraph(name.clone()).await?; + subgraph_registrar + .create_subgraph_version( + name.clone(), + subgraph_id.clone(), + node_id, + debug_fork, + start_block, + None, + None, + ) + .await + .and_then(|locator| { + info!(logger, "Subgraph deployed"; "name" => name.to_string(), "id" => subgraph_id.to_string(), "locator" => locator.to_string()); + Ok(locator) + }) +} + +pub async fn drop_and_recreate_subgraph( + logger: &Logger, + subgraph_store: Arc, + subgraph_registrar: Arc, + name: SubgraphName, + subgraph_id: DeploymentHash, + node_id: NodeId, + hash: DeploymentHash, +) -> Result { + let locator = subgraph_store.active_locator(&hash)?; + if let Some(locator) = locator.clone() { + cleanup_dev_subgraph(logger, &subgraph_store, &name, &locator)?; + } + + deploy_subgraph( + logger, + subgraph_registrar, + name, + subgraph_id, + node_id, + None, + None, + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to deploy subgraph: {}", e)) +} + +/// Watch for subgraph updates, drop and recreate them +/// This is used to listen to file changes in the subgraph directory +/// And drop and recreate the subgraph when it changes +pub async fn watch_subgraph_updates( + logger: &Logger, + subgraph_store: Arc, + subgraph_registrar: Arc, + node_id: NodeId, + mut rx: Receiver<(DeploymentHash, SubgraphName)>, +) { + while let Some((hash, name)) = rx.recv().await { + let res = drop_and_recreate_subgraph( + logger, + subgraph_store.clone(), + subgraph_registrar.clone(), + name.clone(), + hash.clone(), + node_id.clone(), + hash.clone(), + ) + .await; + + if let Err(e) = res { + error!(logger, "Failed to drop and recreate subgraph"; + "name" => name.to_string(), + "hash" => hash.to_string(), + "error" => e.to_string() + ); + } + } + + error!(logger, "Subgraph watcher terminated unexpectedly"; "action" => "exiting"); + std::process::exit(1); +} diff --git a/node/src/dev/mod.rs b/node/src/dev/mod.rs new file mode 100644 index 00000000000..20d269524b3 --- /dev/null +++ b/node/src/dev/mod.rs @@ -0,0 +1,2 @@ +pub mod helpers; +pub mod watcher; diff --git a/node/src/dev/watcher.rs b/node/src/dev/watcher.rs new file mode 100644 index 00000000000..53fbd729bcd --- /dev/null +++ b/node/src/dev/watcher.rs @@ -0,0 +1,136 @@ +use globset::{Glob, GlobSet, GlobSetBuilder}; +use graph::prelude::{DeploymentHash, SubgraphName}; +use graph::slog::{error, info, Logger}; +use graph::tokio::sync::mpsc::Sender; +use notify::{recommended_watcher, Event, RecursiveMode, Watcher}; +use std::path::{Path, PathBuf}; +use std::sync::mpsc; +use std::time::Duration; + +const WATCH_DELAY: Duration = Duration::from_secs(5); + +/// Sets up a watcher for the given directory with optional exclusions. +/// Exclusions can include glob patterns like "pgtemp-*". +pub async fn watch_subgraph_dir( + logger: &Logger, + dir: PathBuf, + id: String, + exclusions: Vec, + sender: Sender<(DeploymentHash, SubgraphName)>, +) { + info!( + logger, + "Watching for changes in directory: {}", + dir.display() + ); + if !exclusions.is_empty() { + info!(logger, "Excluding patterns: {}", exclusions.join(", ")); + } + + // Create exclusion matcher + let exclusion_set = build_glob_set(&exclusions, logger); + + // Create a channel to receive the events + let (tx, rx) = mpsc::channel(); + + // Create a watcher object + let mut watcher = match recommended_watcher(tx) { + Ok(w) => w, + Err(e) => { + error!(logger, "Error creating file watcher: {}", e); + return; + } + }; + + if let Err(e) = watcher.watch(&dir, RecursiveMode::Recursive) { + error!(logger, "Error watching directory {}: {}", dir.display(), e); + return; + } + + let watch_dir = dir.clone(); + let watch_exclusion_set = exclusion_set.clone(); + + loop { + let first_event = match rx.recv() { + Ok(Ok(e)) if should_process_event(&e, &watch_dir, &watch_exclusion_set) => Some(e), + Ok(_) => continue, + Err(_) => break, + }; + + if first_event.is_none() { + continue; + } + + // Once we receive an event, wait for a short period of time to allow for multiple events to be received + // This is because running graph build writes multiple files at once + // Which triggers multiple events, we only need to react to it once + let start = std::time::Instant::now(); + while start.elapsed() < WATCH_DELAY { + match rx.try_recv() { + // Discard all events until the time window has passed + Ok(_) => continue, + Err(_) => break, + } + } + + let _ = sender + .send(( + DeploymentHash::new(id.clone()).unwrap(), + SubgraphName::new("test").unwrap(), + )) + .await; + } +} + +/// Build a GlobSet from the provided patterns +fn build_glob_set(patterns: &[String], logger: &Logger) -> GlobSet { + let mut builder = GlobSetBuilder::new(); + + for pattern in patterns { + match Glob::new(pattern) { + Ok(glob) => { + builder.add(glob); + } + Err(e) => error!(logger, "Invalid glob pattern '{}': {}", pattern, e), + } + } + + match builder.build() { + Ok(set) => set, + Err(e) => { + error!(logger, "Failed to build glob set: {}", e); + GlobSetBuilder::new().build().unwrap() + } + } +} + +/// Determines if an event should be processed based on exclusion patterns +fn should_process_event(event: &Event, base_dir: &Path, exclusion_set: &GlobSet) -> bool { + // If no exclusions, process all events + if exclusion_set.is_empty() { + return true; + } + + // Check each path in the event + for path in event.paths.iter() { + // Get the relative path from the base directory + if let Ok(rel_path) = path.strip_prefix(base_dir) { + let path_str = rel_path.to_string_lossy(); + + // Check if path matches any exclusion pattern + if exclusion_set.is_match(path_str.as_ref()) { + return false; + } + + // Also check against the file name for basename patterns + if let Some(file_name) = rel_path.file_name() { + let name_str = file_name.to_string_lossy(); + if exclusion_set.is_match(name_str.as_ref()) { + return false; + } + } + } + } + + true +} diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 1167cc08e1a..d82a5d0fcbf 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -6,6 +6,7 @@ use graph::futures03::compat::Future01CompatExt; use graph::futures03::future::TryFutureExt; use crate::config::Config; +use crate::dev::helpers::{watch_subgraph_updates, DevModeContext}; use crate::network_setup::Networks; use crate::opt::Opt; use crate::store_builder::StoreBuilder; @@ -267,7 +268,7 @@ fn build_subgraph_registrar( blockchain_map: Arc, node_id: NodeId, subgraph_settings: Settings, - link_resolver: Arc, + link_resolver: Arc, subscription_manager: Arc, arweave_service: ArweaveService, ipfs_service: IpfsService, @@ -347,8 +348,7 @@ fn build_graphql_server( graphql_server } -pub async fn run(opt: Opt, env_vars: Arc) { - env_logger::init(); +pub async fn run(opt: Opt, env_vars: Arc, dev_ctx: Option) { // Set up logger let logger = logger(opt.debug); @@ -436,7 +436,12 @@ pub async fn run(opt: Opt, env_vars: Arc) { // Convert the clients into a link resolver. Since we want to get past // possible temporary DNS failures, make the resolver retry - let link_resolver = Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())); + let link_resolver: Arc = if let Some(dev_ctx) = &dev_ctx { + dev_ctx.file_link_resolver.clone() + } else { + Arc::new(IpfsResolver::new(ipfs_client, env_vars.cheap_clone())) + }; + let metrics_server = PrometheusMetricsServer::new(&logger_factory, prometheus_registry.clone()); let endpoint_metrics = Arc::new(EndpointMetrics::new( @@ -553,7 +558,7 @@ pub async fn run(opt: Opt, env_vars: Arc) { // Add the CLI subgraph with a REST request to the admin server. if let Some(subgraph) = subgraph { - deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id); + deploy_subgraph_from_flag(subgraph, &opt, subgraph_registrar.clone(), node_id.clone()); } // Serve GraphQL queries over HTTP @@ -568,6 +573,23 @@ pub async fn run(opt: Opt, env_vars: Arc) { .await .expect("Failed to start metrics server") }); + + // If we are in dev mode, watch for subgraph updates + // And drop and recreate the subgraph when it changes + if let Some(dev_ctx) = dev_ctx { + if dev_ctx.watch { + graph::spawn(async move { + watch_subgraph_updates( + &logger, + network_store.subgraph_store(), + subgraph_registrar.clone(), + node_id.clone(), + dev_ctx.updates_rx, + ) + .await; + }); + } + } }; graph::spawn(launch_services(logger.clone(), env_vars.cheap_clone())); diff --git a/node/src/lib.rs b/node/src/lib.rs index 7e15869d941..7b3d4347ee8 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -7,12 +7,12 @@ extern crate diesel; pub mod chain; pub mod config; +pub mod dev; pub mod launcher; pub mod manager; pub mod network_setup; pub mod opt; pub mod store_builder; - pub struct MetricsContext { pub prometheus: Arc, pub registry: Arc, diff --git a/node/src/main.rs b/node/src/main.rs index 1c0cfeda3a4..807e577828b 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -23,8 +23,9 @@ fn main() { } async fn main_inner() { + env_logger::init(); let env_vars = Arc::new(EnvVars::from_env().unwrap()); let opt = opt::Opt::parse(); - launcher::run(opt, env_vars).await; + launcher::run(opt, env_vars, None).await; } diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index d19cc68f44a..54bbafbc124 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -1415,6 +1415,16 @@ impl SubgraphStoreTrait for SubgraphStore { }) } + fn unassign_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { + let site = self.find_site(deployment.id.into())?; + let mut pconn = self.primary_conn()?; + pconn.transaction(|conn| -> Result<_, StoreError> { + let mut pconn = primary::Connection::new(conn); + let changes = pconn.unassign_subgraph(site.as_ref())?; + pconn.send_store_event(&self.sender, &StoreEvent::new(changes)) + }) + } + fn pause_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError> { let site = self.find_site(deployment.id.into())?; let mut pconn = self.primary_conn()?;