Skip to content

Commit

Permalink
Merge pull request #2973 from autonomys/custom-spawn_tasks
Browse files Browse the repository at this point in the history
Custom `spawn_tasks`
  • Loading branch information
nazar-pc authored Aug 15, 2024
2 parents 92a51c1 + ff568cc commit 72575a0
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-node/src/commands/run/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ pub(super) fn create_consensus_chain_configuration(
state_pruning: pruning_params.state_pruning(),
blocks_pruning: pruning_params.blocks_pruning(),
rpc_options: SubstrateRpcConfiguration {
listen_on: rpc_options.rpc_listen_on,
listen_on: Some(rpc_options.rpc_listen_on),
max_connections: rpc_options.rpc_max_connections,
cors: rpc_cors.into(),
methods: match rpc_options.rpc_methods {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-node/src/commands/run/domain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ pub(super) fn create_domain_configuration(
state_pruning: pruning_params.state_pruning()?,
blocks_pruning: pruning_params.blocks_pruning()?,
rpc_options: SubstrateRpcConfiguration {
listen_on: rpc_options.rpc_listen_on,
listen_on: Some(rpc_options.rpc_listen_on),
max_connections: rpc_options.rpc_max_connections,
cors: rpc_cors.into(),
methods: match rpc_options.rpc_methods {
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ sc-rpc = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781
sc-rpc-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-rpc-spec-v2 = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-service = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631", default-features = false }
sc-sysinfo = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631", default-features = false }
sc-subspace-block-relay = { version = "0.1.0", path = "../sc-subspace-block-relay" }
sc-telemetry = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
sc-tracing = { git = "https://github.com/subspace/polkadot-sdk", rev = "5626154d0781ac9a6ffd5a6207ed237f425ae631" }
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub const RPC_DEFAULT_MAX_RESPONSE_SIZE_MB: u32 = 15;
#[derive(Debug)]
pub struct SubstrateRpcConfiguration {
/// IP and port (TCP) on which to listen for RPC requests
pub listen_on: SocketAddr,
pub listen_on: Option<SocketAddr>,
/// Maximum number of connections for JSON-RPC server
pub max_connections: u32,
/// CORS settings for HTTP & WS servers. `None` if all origins are allowed
Expand Down Expand Up @@ -181,7 +181,7 @@ impl From<SubstrateConfiguration> for Configuration {
blocks_pruning: configuration.blocks_pruning,
wasm_method: Default::default(),
wasm_runtime_overrides: None,
rpc_addr: Some(configuration.rpc_options.listen_on),
rpc_addr: configuration.rpc_options.listen_on,
rpc_methods: configuration.rpc_options.methods,
rpc_max_connections: configuration.rpc_options.max_connections,
rpc_cors: configuration.rpc_options.cors,
Expand Down
3 changes: 2 additions & 1 deletion crates/subspace-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod metrics;
pub(crate) mod mmr;
pub mod rpc;
pub mod sync_from_dsn;
mod task_spawner;
pub mod transaction_pool;

use crate::config::{ChainSyncMode, SubspaceConfiguration, SubspaceNetworking};
Expand Down Expand Up @@ -1180,7 +1181,7 @@ where
// We replace the Substrate implementation of metrics server with our own.
config.base.prometheus_config.take();

let rpc_handlers = sc_service::spawn_tasks(SpawnTasksParams {
let rpc_handlers = task_spawner::spawn_tasks(SpawnTasksParams {
network: network_service.clone(),
client: client.clone(),
keystore: keystore_container.keystore(),
Expand Down
167 changes: 167 additions & 0 deletions crates/subspace-service/src/task_spawner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use sc_client_api::{
BlockBackend, BlockchainEvents, ExecutorProvider, ProofProvider, StorageProvider, UsageProvider,
};
use sc_rpc_api::DenyUnsafe;
use sc_service::{
gen_rpc_module, init_telemetry, propagate_transaction_notifications, start_rpc_servers, Error,
MetricsService, RpcHandlers, SpawnTasksParams,
};
use sc_transaction_pool_api::MaintainedTransactionPool;
use sp_api::{CallApiAt, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::block_validation::Chain;
use sp_runtime::traits::{Block as BlockT, BlockIdTo};
use std::sync::Arc;
use tracing::info;

/// Spawn the tasks that are required to run a node.
pub(super) fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
params: SpawnTasksParams<TBl, TCl, TExPool, TRpc, TBackend>,
) -> Result<RpcHandlers, Error>
where
TCl: ProvideRuntimeApi<TBl>
+ HeaderMetadata<TBl, Error = sp_blockchain::Error>
+ Chain<TBl>
+ BlockBackend<TBl>
+ BlockIdTo<TBl, Error = sp_blockchain::Error>
+ ProofProvider<TBl>
+ HeaderBackend<TBl>
+ BlockchainEvents<TBl>
+ ExecutorProvider<TBl>
+ UsageProvider<TBl>
+ StorageProvider<TBl, TBackend>
+ CallApiAt<TBl>
+ Send
+ 'static,
<TCl as ProvideRuntimeApi<TBl>>::Api: sp_api::Metadata<TBl>
+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl>
+ sp_session::SessionKeys<TBl>
+ sp_api::ApiExt<TBl>,
TBl: BlockT,
TBl::Hash: Unpin,
TBl::Header: Unpin,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> + 'static,
{
let SpawnTasksParams {
// TODO: Stop using `Configuration` once
// https://github.com/paritytech/polkadot-sdk/pull/5364 is in our fork
mut config,
task_manager,
client,
backend,
keystore,
transaction_pool,
rpc_builder,
network,
system_rpc_tx,
tx_handler_controller,
sync_service,
telemetry,
} = params;

let chain_info = client.usage_info().chain;

let sysinfo = sc_sysinfo::gather_sysinfo();
sc_sysinfo::print_sysinfo(&sysinfo);

let telemetry = telemetry
.map(|telemetry| {
init_telemetry(
&mut config,
network.clone(),
client.clone(),
telemetry,
Some(sysinfo),
)
})
.transpose()?;

info!("📦 Highest known block at #{}", chain_info.best_number);

let spawn_handle = task_manager.spawn_handle();

// Inform the tx pool about imported and finalized blocks.
spawn_handle.spawn(
"txpool-notifications",
Some("transaction-pool"),
sc_transaction_pool::notification_future(client.clone(), transaction_pool.clone()),
);

spawn_handle.spawn(
"on-transaction-imported",
Some("transaction-pool"),
propagate_transaction_notifications(
transaction_pool.clone(),
tx_handler_controller,
telemetry.clone(),
),
);

// Periodically updated metrics and telemetry updates.
spawn_handle.spawn(
"telemetry-periodic-send",
None,
MetricsService::new(telemetry).run(
client.clone(),
transaction_pool.clone(),
network.clone(),
sync_service.clone(),
),
);

let rpc_id_provider = config.rpc_id_provider.take();

// jsonrpsee RPC
let gen_rpc_module = |deny_unsafe: DenyUnsafe| {
gen_rpc_module(
deny_unsafe,
task_manager.spawn_handle(),
client.clone(),
transaction_pool.clone(),
keystore.clone(),
system_rpc_tx.clone(),
&config,
backend.clone(),
&*rpc_builder,
)
};

let rpc = config
.rpc_addr
.map(|_| start_rpc_servers(&config, gen_rpc_module, rpc_id_provider))
.transpose()?;
let rpc_handlers = RpcHandlers::new(Arc::new(gen_rpc_module(DenyUnsafe::No)?));

// Spawn informant task
spawn_handle.spawn(
"informant",
None,
sc_informant::build(
client.clone(),
network,
sync_service.clone(),
config.informant_output_format,
),
);

task_manager.keep_alive((config.base_path, rpc));

Ok(rpc_handlers)
}
4 changes: 2 additions & 2 deletions domains/service/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub const RPC_DEFAULT_MAX_RESPONSE_SIZE_MB: u32 = 15;
#[derive(Debug)]
pub struct SubstrateRpcConfiguration {
/// IP and port (TCP) on which to listen for RPC requests
pub listen_on: SocketAddr,
pub listen_on: Option<SocketAddr>,
/// Maximum number of connections for JSON-RPC server
pub max_connections: u32,
/// CORS settings for HTTP & WS servers. `None` if all origins are allowed
Expand Down Expand Up @@ -173,7 +173,7 @@ impl From<SubstrateConfiguration> for Configuration {
blocks_pruning: configuration.blocks_pruning,
wasm_method: Default::default(),
wasm_runtime_overrides: None,
rpc_addr: Some(configuration.rpc_options.listen_on),
rpc_addr: configuration.rpc_options.listen_on,
rpc_methods: configuration.rpc_options.methods,
rpc_max_connections: configuration.rpc_options.max_connections,
rpc_cors: configuration.rpc_options.cors,
Expand Down

0 comments on commit 72575a0

Please sign in to comment.