-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
101 additions
and
21 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,61 +1,139 @@ | ||
use std::marker::PhantomData; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::thread::JoinHandle; | ||
use std::time::Duration; | ||
|
||
use anyhow::Result; | ||
use lightning_interfaces::prelude::*; | ||
use lightning_interfaces::ShutdownController; | ||
use tokio::runtime::{Runtime, UnhandledPanic}; | ||
use tokio::task::JoinHandle; | ||
use tokio::time::timeout; | ||
|
||
/// A single [Node] instance that has ownership over its tokio runtime. | ||
pub struct ContainedNode<C: Collection> { | ||
/// The dependency injection data provider which can contain most of the items that make up | ||
/// a node. | ||
provider: fdi::Provider, | ||
provider: fdi::MultiThreadedProvider, | ||
|
||
/// A handle to the work thread. | ||
handle: JoinHandle<()>, | ||
/// The shutdown controller that has its waiter in the provider. | ||
shutdown: ShutdownController, | ||
|
||
/// A handle to the tokio runtime. | ||
runtime: Runtime, | ||
|
||
collection: PhantomData<C>, | ||
} | ||
|
||
impl<C: Collection> ContainedNode<C> { | ||
pub fn new(provider: fdi::Provider, index: usize) -> Self { | ||
pub fn new(provider: fdi::MultiThreadedProvider, name: Option<String>) -> Self { | ||
let worker_id = AtomicUsize::new(0); | ||
let runtime = tokio::runtime::Builder::new_multi_thread() | ||
.thread_name_fn(move || { | ||
let id = worker_id.fetch_add(1, Ordering::SeqCst); | ||
format!("NODE-{index}#{id}") | ||
format!("{}#{id}", name.as_deref().unwrap_or("LIGHTNING")) | ||
}) | ||
.enable_all() | ||
.unhandled_panic(UnhandledPanic::ShutdownRuntime) | ||
.build() | ||
.expect("Failed to build tokio runtime for node container."); | ||
|
||
let handle = std::thread::Builder::new() | ||
.name(format!("NODE-{index}#MAIN")) | ||
.spawn(move || { | ||
runtime.block_on(async move { | ||
// | ||
}); | ||
}) | ||
.expect("Failed to spawn E2E thread"); | ||
// Create and insert the shutdown controller to the provider. | ||
let trace_shutdown = std::env::var("TRACE_SHUTDOWN").is_ok(); | ||
let shutdown = ShutdownController::new(trace_shutdown); | ||
let waiter = shutdown.waiter(); | ||
provider.insert(waiter); | ||
|
||
// Will make the shutdown controller listen for ctrl+c. | ||
shutdown.install_ctrlc_handlers(); | ||
|
||
Self { | ||
provider, | ||
handle, | ||
runtime, | ||
shutdown, | ||
collection: PhantomData, | ||
} | ||
} | ||
|
||
/// Returns a reference to the data provider. | ||
pub fn provider(&self) -> &fdi::Provider { | ||
&self.provider | ||
/// Start the node and return a handle to the task that started the node. The task returns as | ||
/// soon as the node | ||
pub fn spawn(&self) -> JoinHandle<Result<()>> { | ||
let provider = self.provider.clone(); | ||
|
||
self.runtime.spawn_blocking(move || { | ||
let graph = C::build_graph(); | ||
let mut provider = provider.get_local_provider(); | ||
|
||
// Set tokio as the spawner of fdi async works. | ||
provider.get_mut::<fdi::Executor>().set_spawn_cb(|fut| { | ||
tokio::spawn(fut); | ||
}); | ||
|
||
// Init all of the components and dependencies. | ||
graph.init_all(&mut provider)?; | ||
|
||
// Send the start signal to the node. | ||
provider.trigger("start"); | ||
|
||
Ok(()) | ||
}) | ||
} | ||
|
||
pub fn shutdown(self) { | ||
self.handle.join().unwrap() | ||
pub async fn shutdown(mut self) { | ||
// Tell the controller it's time to go down. | ||
self.shutdown.trigger_shutdown(); | ||
|
||
// Give the runtime 30 seconds to stop. | ||
self.runtime.shutdown_timeout(Duration::from_secs(30)); | ||
|
||
for i in 0.. { | ||
if timeout(Duration::from_secs(3), self.shutdown.wait_for_completion()) | ||
.await | ||
.is_ok() | ||
{ | ||
// shutdown completed. | ||
return; | ||
} | ||
|
||
match i { | ||
0 | 1 => { | ||
// 3s, 6s | ||
tracing::trace!("Still shutting down..."); | ||
continue; | ||
}, | ||
2 => { | ||
// 9s | ||
tracing::warn!("Still shutting down..."); | ||
continue; | ||
}, | ||
_ => { | ||
// 12s | ||
tracing::error!("Shutdown taking too long..") | ||
}, | ||
} | ||
|
||
if i == 10 { | ||
// 33s | ||
break; | ||
} | ||
|
||
let Some(iter) = self.shutdown.pending_backtraces() else { | ||
continue; | ||
}; | ||
|
||
for (i, trace) in iter.enumerate() { | ||
eprintln!("Pending task backtrace #{i}:\n{trace:#?}"); | ||
} | ||
} | ||
} | ||
|
||
/// Returns a reference to the data provider. | ||
pub fn provider(&self) -> &fdi::MultiThreadedProvider { | ||
&self.provider | ||
} | ||
} | ||
|
||
impl<C: Collection> Default for ContainedNode<C> { | ||
fn default() -> Self { | ||
Self::new(fdi::Provider::default(), 0) | ||
Self::new(fdi::MultiThreadedProvider::default(), None) | ||
} | ||
} |