Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Update Substrate (#661)
Browse files Browse the repository at this point in the history
* Make compat with exit-future updates

* Update exit-future entirely

* Tidy

* Bump Substrate
  • Loading branch information
gavofyork authored Dec 4, 2019
1 parent 84ece42 commit a06b9de
Show file tree
Hide file tree
Showing 13 changed files with 186 additions and 166 deletions.
242 changes: 133 additions & 109 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion availability-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ log = "0.4.8"
futures01 = "0.1.17"
futures = { package = "futures", version = "0.3.1", features = ["compat"] }
tokio = "0.1.7"
exit-future = "0.1"
exit-future = "0.2.0"
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
Expand Down
39 changes: 15 additions & 24 deletions availability-store/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ use polkadot_primitives::parachain::{
CandidateReceipt, ParachainHost, ValidatorId,
ValidatorPair, AvailableMessages, BlockData, ErasureChunk,
};
use futures01::Future;
use futures::channel::{mpsc, oneshot};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt};
use futures::{FutureExt, Sink, SinkExt, TryFutureExt, StreamExt, future::select};
use keystore::KeyStorePtr;

use tokio::runtime::current_thread::{Handle, Runtime as LocalRuntime};
Expand Down Expand Up @@ -166,7 +165,7 @@ impl WorkerHandle {
impl Drop for WorkerHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}

if let Some(thread) = self.thread.take() {
Expand Down Expand Up @@ -296,7 +295,7 @@ where
impl<PGM> Drop for Worker<PGM> {
fn drop(&mut self) {
for (_, signal) in self.registered_gossip_streams.drain() {
signal.fire();
let _ = signal.fire();
}
}
}
Expand Down Expand Up @@ -356,13 +355,10 @@ where
self.registered_gossip_streams.insert(topic, signal);

let _ = runtime_handle.spawn(
fut
.unit_error()
.boxed()
select(fut.boxed(), exit)
.map(|_| Ok(()))
.compat()
.select(exit)
.then(|_| Ok(()))
);
);

Ok(())
}
Expand Down Expand Up @@ -423,7 +419,7 @@ where
let topic = erasure_coding_topic(relay_parent, receipt.erasure_root, chunk.index);
// need to remove gossip listener and stop it.
if let Some(signal) = self.registered_gossip_streams.remove(&topic) {
signal.fire();
let _ = signal.fire();
}
}

Expand Down Expand Up @@ -594,15 +590,12 @@ where
};

runtime.spawn(
process_notification
.unit_error()
.boxed()
futures::future::select(process_notification.boxed(), exit.clone())
.map(|_| Ok(()))
.compat()
.select(exit.clone())
.then(|_| Ok(()))
);

if let Err(e) = runtime.block_on(exit) {
if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
warn!(target: LOG_TARGET, "Availability worker error {:?}", e);
}

Expand Down Expand Up @@ -636,7 +629,7 @@ pub struct AvailabilityBlockImport<I, P> {
impl<I, P> Drop for AvailabilityBlockImport<I, P> {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}
}
}
Expand Down Expand Up @@ -775,12 +768,10 @@ impl<I, P> AvailabilityBlockImport<I, P> {
// dependent on the types of client and executor, which would prove
// not not so handy in the testing code.
let mut exit_signal = Some(signal);
let prune_available = prune_unneeded_availability(client.clone(), to_worker.clone())
.unit_error()
.boxed()
.compat()
.select(exit.clone())
.then(|_| Ok(()));
let prune_available = select(
prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(),
exit.clone()
).map(|_| Ok(())).compat();

if let Err(_) = thread_pool.execute(Box::new(prune_available)) {
error!(target: LOG_TARGET, "Failed to spawn availability pruning task");
Expand Down
25 changes: 12 additions & 13 deletions collator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use std::time::Duration;

use futures::{
future, Future, Stream, FutureExt, TryFutureExt, StreamExt,
compat::{Compat01As03, Future01CompatExt, Stream01CompatExt}
compat::{Future01CompatExt, Stream01CompatExt}
};
use futures01::{Future as _};
use log::{warn, error};
Expand Down Expand Up @@ -248,7 +248,7 @@ struct ApiContext<P, E> {
impl<P: 'static, E: 'static> RelayChainContext for ApiContext<P, E> where
P: ProvideRuntimeApi + Send + Sync,
P::Api: ParachainHost<Block>,
E: futures01::Future<Item=(),Error=()> + Clone + Send + Sync + 'static,
E: futures::Future<Output=()> + Clone + Send + Sync + 'static,
{
type Error = String;
type FutureEgress = Box<dyn Future<Output=Result<ConsolidatedIngress, String>> + Unpin + Send>;
Expand Down Expand Up @@ -277,19 +277,19 @@ struct CollationNode<P, E> {
}

impl<P, E> IntoExit for CollationNode<P, E> where
E: futures01::Future<Item=(),Error=()> + Unpin + Send + 'static
E: futures::Future<Output=()> + Unpin + Send + 'static
{
type Exit = future::Map<Compat01As03<E>, fn (Result<(), ()>) -> ()>;
type Exit = E;
fn into_exit(self) -> Self::Exit {
self.exit.compat().map(drop)
self.exit
}
}

impl<P, E> Worker for CollationNode<P, E> where
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item=(),Error=()> + Clone + Unpin + Send + Sync + 'static,
E: futures::Future<Output=()> + Clone + Unpin + Send + Sync + 'static,
{
type Work = Box<dyn Future<Output=()> + Unpin + Send>;

Expand Down Expand Up @@ -433,7 +433,8 @@ impl<P, E> Worker for CollationNode<P, E> where
outgoing,
);

tokio::spawn(res.select(inner_exit_2.clone()).then(|_| Ok(())));
let exit = inner_exit_2.clone().unit_error().compat();
tokio::spawn(res.select(exit).then(|_| Ok(())));
})
});

Expand All @@ -454,17 +455,15 @@ impl<P, E> Worker for CollationNode<P, E> where

let future = future::select(
silenced,
inner_exit.clone().map(|_| Ok::<_, ()>(())).compat()
inner_exit.clone()
).map(|_| Ok::<_, ()>(())).compat();

tokio::spawn(future);
future::ready(())
});

let work_and_exit = future::select(
work,
exit.map(|_| Ok::<_, ()>(())).compat()
).map(|_| ());
let work_and_exit = future::select(work, exit)
.map(|_| ());

Box::new(work_and_exit)
}
Expand Down Expand Up @@ -495,7 +494,7 @@ pub fn run_collator<P, E>(
P: BuildParachainContext + Send + 'static,
P::ParachainContext: Send + 'static,
<P::ParachainContext as ParachainContext>::ProduceCandidate: Send + 'static,
E: futures01::Future<Item = (),Error=()> + Unpin + Send + Clone + Sync + 'static,
E: futures::Future<Output = ()> + Unpin + Send + Clone + Sync + 'static,
{
let node_logic = CollationNode { build_parachain_context, exit, para_id, key };
polkadot_cli::run(node_logic, version)
Expand Down
2 changes: 1 addition & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkad
futures = "0.1"
futures03 = { package = "futures", version = "0.3.1", features = ["compat"] }
log = "0.4.8"
exit-future = "0.1.4"
exit-future = "0.2.0"
sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }

Expand Down
7 changes: 4 additions & 3 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
/// Import a statement whose signature has been checked already.
pub(crate) fn import_statement(&self, statement: SignedStatement) {
Expand Down Expand Up @@ -174,7 +174,8 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w

if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) {
trace!(target: "validation", "driving statement work to completion");
let work = work.select2(self.fetcher.exit().clone()).then(|_| Ok(()));
let exit = self.fetcher.exit().clone().unit_error().compat();
let work = work.select2(exit).then(|_| Ok(()));
self.fetcher.executor().spawn(work);
}
}
Expand Down Expand Up @@ -224,7 +225,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + Unpin + 'static,
{
type Error = io::Error;
type FetchValidationProof = validation::PoVReceiver;
Expand Down
15 changes: 10 additions & 5 deletions network/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use polkadot_primitives::parachain::{
use futures::prelude::*;
use futures::future::{self, Executor as FutureExecutor};
use futures::sync::oneshot::{self, Receiver};
use futures03::{FutureExt as _, TryFutureExt as _};

use std::collections::hash_map::{HashMap, Entry};
use std::io;
Expand Down Expand Up @@ -123,7 +124,7 @@ impl<P, E: Clone, N, T: Clone> Clone for ValidationNetwork<P, E, N, T> {
impl<P, E, N, T> ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
E: Clone + futures03::Future<Output=()> + Send + Sync + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
Expand Down Expand Up @@ -206,7 +207,7 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where N: NetworkService {
impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
P: ProvideRuntimeApi + Send + Sync + 'static,
P::Api: ParachainHost<Block, Error = sp_blockchain::Error>,
E: Clone + Future<Item=(),Error=()> + Send + Sync + 'static,
E: Clone + futures03::Future<Output=()> + Send + Sync + Unpin + 'static,
N: NetworkService,
T: Clone + Executor + Send + Sync + 'static,
{
Expand Down Expand Up @@ -242,8 +243,12 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where

let table_router_clone = table_router.clone();
let work = table_router.checked_statements()
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) });
executor.spawn(work.select(exit.clone()).map(|_| ()).map_err(|_| ()));
.for_each(move |msg| { table_router_clone.import_statement(msg); Ok(()) })
.select(exit.clone().unit_error().compat())
.map(|_| ())
.map_err(|_| ());

executor.spawn(work);

table_router
});
Expand Down Expand Up @@ -670,7 +675,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> LeafWorkDataFetcher<P, E, N, T> where
P::Api: ParachainHost<Block>,
N: NetworkService,
T: Clone + Executor + Send + 'static,
E: Future<Item=(),Error=()> + Clone + Send + 'static,
E: futures03::Future<Output=()> + Clone + Send + 'static,
{
/// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
Expand Down
2 changes: 1 addition & 1 deletion service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sc-executor = { git = "https://github.com/paritytech/substrate", branch = "polka
sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
consensus_common = { package = "sp-consensus", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa = { package = "sc-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa_primitives = { package = "sp-finality-granpda", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
grandpa_primitives = { package = "sp-finality-grandpa", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
inherents = { package = "sp-inherents", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
service = { package = "sc-service", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
telemetry = { package = "sc-telemetry", git = "https://github.com/paritytech/substrate", branch = "polkadot-master" }
Expand Down
2 changes: 1 addition & 1 deletion test-parachains/adder/collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ client-api = { package = "sc-client-api", git = "https://github.com/paritytech/s
parking_lot = "0.9.0"
ctrlc = { version = "3.1.3", features = ["termination"] }
futures = "0.3.1"
exit-future = "0.1.4"
exit-future = "0.2.0"
2 changes: 1 addition & 1 deletion test-parachains/adder/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ fn main() {
let exit_send_cell = RefCell::new(Some(exit_send));
ctrlc::set_handler(move || {
if let Some(exit_send) = exit_send_cell.try_borrow_mut().expect("signal handler not reentrant; qed").take() {
exit_send.fire();
let _ = exit_send.fire();
}
}).expect("Error setting up ctrl-c handler");

Expand Down
2 changes: 1 addition & 1 deletion validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ parking_lot = "0.9.0"
tokio = "0.1.22"
derive_more = "0.14.1"
log = "0.4.8"
exit-future = "0.1.4"
exit-future = "0.2.0"
tokio-executor = { version = "0.2.0-alpha.6", features = ["blocking"] }
codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] }
availability_store = { package = "polkadot-availability-store", path = "../availability-store" }
Expand Down
10 changes: 5 additions & 5 deletions validation/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use sp_blockchain::HeaderBackend;
use block_builder::BlockBuilderApi;
use consensus::SelectChain;
use futures::prelude::*;
use futures03::{TryStreamExt as _, StreamExt as _};
use futures03::{TryStreamExt as _, StreamExt as _, FutureExt as _, TryFutureExt as _};
use log::error;
use polkadot_primitives::Block;
use polkadot_primitives::parachain::ParachainHost;
Expand Down Expand Up @@ -107,7 +107,7 @@ pub(crate) fn start<C, N, P, SC>(
}
Ok(())
})
.select(exit.clone())
.select(exit.clone().unit_error().compat())
.then(|_| Ok(()))
};

Expand All @@ -130,7 +130,7 @@ pub(crate) fn start<C, N, P, SC>(
}
})
.map_err(|e| warn!("Timer error {:?}", e))
.select(exit.clone())
.select(exit.clone().unit_error().compat())
.then(|_| Ok(()))
};

Expand All @@ -139,7 +139,7 @@ pub(crate) fn start<C, N, P, SC>(
error!("Failed to spawn old sessions pruning task");
}

if let Err(e) = runtime.block_on(exit) {
if let Err(e) = runtime.block_on(exit.unit_error().compat()) {
debug!("BFT event loop error {:?}", e);
}
});
Expand All @@ -153,7 +153,7 @@ pub(crate) fn start<C, N, P, SC>(
impl Drop for ServiceHandle {
fn drop(&mut self) {
if let Some(signal) = self.exit_signal.take() {
signal.fire();
let _ = signal.fire();
}

if let Some(thread) = self.thread.take() {
Expand Down
2 changes: 1 addition & 1 deletion validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ impl<C, N, P> ParachainValidation<C, N, P> where
})
.and_then(with_router)
.then(|_| Ok(()))
.select(exit)
.select(exit.unit_error().compat())
.then(|_| Ok(()));

// spawn onto thread pool.
Expand Down

0 comments on commit a06b9de

Please sign in to comment.