Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
make it easier to dbg stalls (#3351)
Browse files Browse the repository at this point in the history
* make it easier to dbg

* revert channel sizes

* BAnon
  • Loading branch information
ordian authored and s3krit committed Jul 6, 2021
1 parent c4cff28 commit 60ff09e
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 68 deletions.
9 changes: 6 additions & 3 deletions node/core/parachains-inherent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,12 @@ impl ParachainsInherentDataProvider {
receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)?.map_err(Error::Subsystem)?;

let (sender, receiver) = futures::channel::oneshot::channel();
overseer.send_msg(AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent, sender),
)).await;
overseer.send_msg(
AllMessages::Provisioner(
ProvisionerMessage::RequestInherentData(parent, sender),
),
std::any::type_name::<Self>(),
).await;

receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData)
};
Expand Down
94 changes: 63 additions & 31 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,10 @@ impl From<FinalityNotification<Block>> for BlockInfo {
enum Event {
BlockImported(BlockInfo),
BlockFinalized(BlockInfo),
MsgToSubsystem(AllMessages),
MsgToSubsystem {
msg: AllMessages,
origin: &'static str,
},
ExternalRequest(ExternalRequest),
Stop,
}
Expand Down Expand Up @@ -452,8 +455,16 @@ impl OverseerHandler {
}

/// Send some message to one of the `Subsystem`s.
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>) {
self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await
pub async fn send_msg(&mut self, msg: impl Into<AllMessages>, origin: &'static str) {
self.send_and_log_error(Event::MsgToSubsystem {
msg: msg.into(),
origin,
}).await
}

/// Same as `send_msg`, but with no origin. Used for tests.
pub async fn send_msg_anon(&mut self, msg: impl Into<AllMessages>) {
self.send_msg(msg, "").await
}

/// Inform the `Overseer` that some block was finalized.
Expand Down Expand Up @@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender {
#[async_trait::async_trait]
impl SubsystemSender for OverseerSubsystemSender {
async fn send_message(&mut self, msg: AllMessages) {
self.channels.send_and_log_error(self.signals_received.load(), msg).await;
let needed_signals = self.signals_received.load();
self.channels.send_and_log_error(needed_signals, msg).await;
}

async fn send_messages<T>(&mut self, msgs: T)
Expand Down Expand Up @@ -891,12 +903,18 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {
loop {
// If we have a message pending an overseer signal, we only poll for signals
// in the meantime.
let signals_received = self.signals_received.load();
if let Some((needs_signals_received, msg)) = self.pending_incoming.take() {
if needs_signals_received <= self.signals_received.load() {
if needs_signals_received <= signals_received {
return Ok(FromOverseer::Communication { msg });
} else {
self.pending_incoming = Some((needs_signals_received, msg));

tracing::debug!(
target: LOG_TARGET,
subsystem = std::any::type_name::<M>(),
diff = needs_signals_received - signals_received,
"waiting for a signal",
);
// wait for next signal.
let signal = self.signals.next().await
.ok_or(SubsystemError::Context(
Expand All @@ -911,7 +929,6 @@ impl<M: Send + 'static> SubsystemContext for OverseerSubsystemContext<M> {

let mut await_message = self.messages.next().fuse();
let mut await_signal = self.signals.next().fuse();
let signals_received = self.signals_received.load();
let pending_incoming = &mut self.pending_incoming;

// Otherwise, wait for the next signal or incoming message.
Expand Down Expand Up @@ -989,7 +1006,7 @@ impl<M> OverseenSubsystem<M> {
/// Send a message to the wrapped subsystem.
///
/// If the inner `instance` is `None`, nothing is happening.
async fn send_message(&mut self, msg: M) -> SubsystemResult<()> {
async fn send_message(&mut self, msg: M, origin: &'static str) -> SubsystemResult<()> {
const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
Expand All @@ -999,7 +1016,12 @@ impl<M> OverseenSubsystem<M> {
}).timeout(MESSAGE_TIMEOUT).await
{
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
tracing::error!(
target: LOG_TARGET,
%origin,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => res.map_err(Into::into),
Expand All @@ -1016,9 +1038,15 @@ impl<M> OverseenSubsystem<M> {
const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10);

if let Some(ref mut instance) = self.instance {
match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await {
match instance.tx_signal.send(signal.clone()).timeout(SIGNAL_TIMEOUT).await {
None => {
tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name);
tracing::error!(
target: LOG_TARGET,
?signal,
received = instance.signals_received,
"Subsystem {} appears unresponsive.",
instance.name,
);
Err(SubsystemError::SubsystemStalled(instance.name))
}
Some(res) => {
Expand Down Expand Up @@ -1909,8 +1937,8 @@ where
};

match msg {
Event::MsgToSubsystem(msg) => {
self.route_message(msg.into()).await?;
Event::MsgToSubsystem { msg, origin } => {
self.route_message(msg.into(), origin).await?;
}
Event::Stop => {
self.stop().await;
Expand Down Expand Up @@ -2045,59 +2073,63 @@ where
Ok(())
}

async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> {
async fn route_message(
&mut self,
msg: AllMessages,
origin: &'static str,
) -> SubsystemResult<()> {
self.metrics.on_message_relayed();
match msg {
AllMessages::CandidateValidation(msg) => {
self.subsystems.candidate_validation.send_message(msg).await?;
self.subsystems.candidate_validation.send_message(msg, origin).await?;
},
AllMessages::CandidateBacking(msg) => {
self.subsystems.candidate_backing.send_message(msg).await?;
self.subsystems.candidate_backing.send_message(msg, origin).await?;
},
AllMessages::StatementDistribution(msg) => {
self.subsystems.statement_distribution.send_message(msg).await?;
self.subsystems.statement_distribution.send_message(msg, origin).await?;
},
AllMessages::AvailabilityDistribution(msg) => {
self.subsystems.availability_distribution.send_message(msg).await?;
self.subsystems.availability_distribution.send_message(msg, origin).await?;
},
AllMessages::AvailabilityRecovery(msg) => {
self.subsystems.availability_recovery.send_message(msg).await?;
self.subsystems.availability_recovery.send_message(msg, origin).await?;
},
AllMessages::BitfieldDistribution(msg) => {
self.subsystems.bitfield_distribution.send_message(msg).await?;
self.subsystems.bitfield_distribution.send_message(msg, origin).await?;
},
AllMessages::BitfieldSigning(msg) => {
self.subsystems.bitfield_signing.send_message(msg).await?;
self.subsystems.bitfield_signing.send_message(msg, origin).await?;
},
AllMessages::Provisioner(msg) => {
self.subsystems.provisioner.send_message(msg).await?;
self.subsystems.provisioner.send_message(msg, origin).await?;
},
AllMessages::RuntimeApi(msg) => {
self.subsystems.runtime_api.send_message(msg).await?;
self.subsystems.runtime_api.send_message(msg, origin).await?;
},
AllMessages::AvailabilityStore(msg) => {
self.subsystems.availability_store.send_message(msg).await?;
self.subsystems.availability_store.send_message(msg, origin).await?;
},
AllMessages::NetworkBridge(msg) => {
self.subsystems.network_bridge.send_message(msg).await?;
self.subsystems.network_bridge.send_message(msg, origin).await?;
},
AllMessages::ChainApi(msg) => {
self.subsystems.chain_api.send_message(msg).await?;
self.subsystems.chain_api.send_message(msg, origin).await?;
},
AllMessages::CollationGeneration(msg) => {
self.subsystems.collation_generation.send_message(msg).await?;
self.subsystems.collation_generation.send_message(msg, origin).await?;
},
AllMessages::CollatorProtocol(msg) => {
self.subsystems.collator_protocol.send_message(msg).await?;
self.subsystems.collator_protocol.send_message(msg, origin).await?;
},
AllMessages::ApprovalDistribution(msg) => {
self.subsystems.approval_distribution.send_message(msg).await?;
self.subsystems.approval_distribution.send_message(msg, origin).await?;
},
AllMessages::ApprovalVoting(msg) => {
self.subsystems.approval_voting.send_message(msg).await?;
self.subsystems.approval_voting.send_message(msg, origin).await?;
},
AllMessages::GossipSupport(msg) => {
self.subsystems.gossip_support.send_message(msg).await?;
self.subsystems.gossip_support.send_message(msg, origin).await?;
},
AllMessages::DisputeCoordinator(_) => {}
AllMessages::DisputeParticipation(_) => {}
Expand Down
34 changes: 17 additions & 17 deletions node/overseer/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ fn overseer_metrics_work() {

handler.block_imported(second_block).await;
handler.block_imported(third_block).await;
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.stop().await;

select! {
Expand Down Expand Up @@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() {

// send a msg to each subsystem
// except for BitfieldSigning and GossipSupport as the messages are not instantiable
handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
// handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;
handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await;
handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await;
handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await;
handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await;
handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await;
handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await;
// handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await;
// handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await;
handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await;
handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await;
handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await;
handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await;
handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await;
handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await;
handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await;

// Wait until all subsystems have received. Otherwise the messages might race against
// the conclude signal.
Expand Down
13 changes: 8 additions & 5 deletions node/service/src/grandpa_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,14 @@ impl<B> grandpa::VotingRule<PolkadotBlock, B> for ApprovalCheckingVotingRule
Box::pin(async move {
let (tx, rx) = oneshot::channel();
let approval_checking_subsystem_vote = {
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
best_hash,
base_number,
tx,
)).await;
overseer.send_msg(
ApprovalVotingMessage::ApprovedAncestor(
best_hash,
base_number,
tx,
),
std::any::type_name::<Self>(),
).await;

rx.await.ok().and_then(|v| v)
};
Expand Down
23 changes: 16 additions & 7 deletions node/service/src/relay_chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>

self.overseer
.clone()
.send_msg(ChainSelectionMessage::Leaves(tx)).await;
.send_msg(
ChainSelectionMessage::Leaves(tx),
std::any::type_name::<Self>(),
).await;

rx.await
.map_err(Error::OverseerDisconnected)
Expand Down Expand Up @@ -264,7 +267,10 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>

let subchain_head = {
let (tx, rx) = oneshot::channel();
overseer.send_msg(ChainSelectionMessage::BestLeafContaining(target_hash, tx)).await;
overseer.send_msg(
ChainSelectionMessage::BestLeafContaining(target_hash, tx),
std::any::type_name::<Self>(),
).await;

let best = rx.await
.map_err(Error::OverseerDisconnected)
Expand Down Expand Up @@ -318,11 +324,14 @@ impl<B> SelectChain<PolkadotBlock> for SelectRelayChain<B>
let (subchain_head, subchain_number) = {

let (tx, rx) = oneshot::channel();
overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor(
subchain_head,
target_number,
tx,
)).await;
overseer.send_msg(
ApprovalVotingMessage::ApprovedAncestor(
subchain_head,
target_number,
tx,
),
std::any::type_name::<Self>(),
).await;

match rx.await
.map_err(Error::OverseerDisconnected)
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {

spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed());

block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default())));
block_on(handler.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default())));
assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_)));
}
}
4 changes: 2 additions & 2 deletions node/test/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,11 @@ impl PolkadotTestNode {
};

self.overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await;

self.overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await;
}
}
Expand Down
4 changes: 2 additions & 2 deletions parachain/test-parachains/adder/collator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ fn main() -> Result<()> {
para_id,
};
overseer_handler
.send_msg(CollationGenerationMessage::Initialize(config))
.send_msg(CollationGenerationMessage::Initialize(config), "Collator")
.await;

overseer_handler
.send_msg(CollatorProtocolMessage::CollateOn(para_id))
.send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator")
.await;

Ok(full_node.task_manager)
Expand Down

0 comments on commit 60ff09e

Please sign in to comment.