Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions client/authority-discovery/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub type Result<T> = std::result::Result<T, Error>;

/// Error type for the authority discovery module.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum Error {
#[error("Received dht value found event with records with different keys.")]
ReceivingDhtValueFoundEventWithDifferentKeys,
Expand Down Expand Up @@ -76,4 +77,7 @@ pub enum Error {

#[error("Received authority record without a valid signature for the remote peer id.")]
MissingPeerIdSignature,

#[error("Unable to fetch best block.")]
BestBlockFetchingError,
}
3 changes: 2 additions & 1 deletion client/authority-discovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
//! See [`Worker`] and [`Service`] for more documentation.

pub use crate::{
error::Error,
service::Service,
worker::{AuthorityDiscovery, NetworkProvider, Role, Worker},
};
Expand Down Expand Up @@ -148,7 +149,7 @@ pub fn new_worker_and_service_with_config<Client, Network, Block, DhtEventStream
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: AuthorityDiscovery<Block> + HeaderBackend<Block> + 'static,
Client: AuthorityDiscovery<Block> + 'static,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
let (to_worker, from_service) = mpsc::channel(0);
Expand Down
15 changes: 11 additions & 4 deletions client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,15 @@ pub trait AuthorityDiscovery<Block: BlockT> {
/// Retrieve authority identifiers of the current and next authority set.
async fn authorities(&self, at: Block::Hash)
-> std::result::Result<Vec<AuthorityId>, ApiError>;

/// Retrieve best block hash
async fn best_hash(&self) -> std::result::Result<Block::Hash, Error>;
}

#[async_trait::async_trait]
impl<Block, T> AuthorityDiscovery<Block> for T
where
T: ProvideRuntimeApi<Block> + Send + Sync,
T: ProvideRuntimeApi<Block> + HeaderBackend<Block> + Send + Sync,
T::Api: AuthorityDiscoveryApi<Block>,
Block: BlockT,
{
Expand All @@ -172,13 +175,17 @@ where
) -> std::result::Result<Vec<AuthorityId>, ApiError> {
self.runtime_api().authorities(at)
}

async fn best_hash(&self) -> std::result::Result<Block::Hash, Error> {
Ok(self.info().best_hash)
}
}

impl<Client, Network, Block, DhtEventStream> Worker<Client, Network, Block, DhtEventStream>
where
Block: BlockT + Unpin + 'static,
Network: NetworkProvider,
Client: AuthorityDiscovery<Block> + HeaderBackend<Block> + 'static,
Client: AuthorityDiscovery<Block> + 'static,
DhtEventStream: Stream<Item = DhtEvent> + Unpin,
{
/// Construct a [`Worker`].
Expand Down Expand Up @@ -377,7 +384,7 @@ where
}

async fn refill_pending_lookups_queue(&mut self) -> Result<()> {
let best_hash = self.client.info().best_hash;
let best_hash = self.client.best_hash().await?;

let local_keys = match &self.role {
Role::PublishAndDiscover(key_store) => key_store
Expand Down Expand Up @@ -597,7 +604,7 @@ where
.into_iter()
.collect::<HashSet<_>>();

let best_hash = client.info().best_hash;
let best_hash = client.best_hash().await?;
let authorities = client
.authorities(best_hash)
.await
Expand Down
2 changes: 1 addition & 1 deletion client/informant/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<B: BlockT> InformantDisplay<B> {
let best_number = info.chain.best_number;
let best_hash = info.chain.best_hash;
let finalized_number = info.chain.finalized_number;
let num_connected_peers = net_status.num_connected_peers;
let num_connected_peers = sync_status.num_connected_peers;
let speed = speed::<B>(best_number, self.last_number, self.last_update);
let total_bytes_inbound = net_status.total_bytes_inbound;
let total_bytes_outbound = net_status.total_bytes_outbound;
Expand Down
2 changes: 2 additions & 0 deletions client/network/common/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ pub struct SyncStatus<Block: BlockT> {
pub best_seen_block: Option<NumberFor<Block>>,
/// Number of peers participating in syncing.
pub num_peers: u32,
/// Number of peers known to `SyncingEngine` (both full and light).
pub num_connected_peers: u32,
/// Number of blocks queued for import
pub queued_blocks: u32,
/// State sync status in progress, if any.
Expand Down
21 changes: 15 additions & 6 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! See the documentation of [`Params`].

pub use crate::{
protocol::NotificationsSink,
request_responses::{
IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig,
},
Expand All @@ -31,9 +32,15 @@ pub use crate::{
use codec::Encode;
use libp2p::{identity::Keypair, multiaddr, Multiaddr, PeerId};
use prometheus_endpoint::Registry;
pub use sc_network_common::{role::Role, sync::warp::WarpSyncProvider, ExHashT};
pub use sc_network_common::{
role::{Role, Roles},
sync::warp::WarpSyncProvider,
ExHashT,
};
use sc_utils::mpsc::TracingUnboundedSender;
use zeroize::Zeroize;

use sp_runtime::traits::Block as BlockT;
use std::{
error::Error,
fmt, fs,
Expand All @@ -44,7 +51,6 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
str::{self, FromStr},
sync::Arc,
};

pub use libp2p::{
Expand Down Expand Up @@ -688,7 +694,7 @@ impl NetworkConfiguration {
}

/// Network initialization parameters.
pub struct Params<Client> {
pub struct Params<Block: BlockT> {
/// Assigned role for our node (full, light, ...).
pub role: Role,

Expand All @@ -698,12 +704,12 @@ pub struct Params<Client> {
/// Network layer configuration.
pub network_config: NetworkConfiguration,

/// Client that contains the blockchain.
pub chain: Arc<Client>,

/// Legacy name of the protocol to use on the wire. Should be different for each chain.
pub protocol_id: ProtocolId,

/// Genesis hash of the chain
pub genesis_hash: Block::Hash,

/// Fork ID to distinguish protocols of different hard forks. Part of the standard protocol
/// name on the wire.
pub fork_id: Option<String>,
Expand All @@ -714,6 +720,9 @@ pub struct Params<Client> {
/// Block announce protocol configuration
pub block_announce_config: NonDefaultSetConfig,

/// TX channel for direct communication with `SyncingEngine` and `Protocol`.
pub tx: TracingUnboundedSender<crate::event::SyncEvent<Block>>,

/// Request response protocol configurations
pub request_response_protocol_configs: Vec<RequestResponseConfig>,
}
Expand Down
47 changes: 45 additions & 2 deletions client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
//! Network event types. These are are not the part of the protocol, but rather
//! events that happen on the network like DHT get/put results received.

use crate::types::ProtocolName;
use crate::{types::ProtocolName, NotificationsSink};

use bytes::Bytes;
use futures::channel::oneshot;
use libp2p::{core::PeerId, kad::record::Key};

use sc_network_common::role::ObservedRole;
use sc_network_common::{role::ObservedRole, sync::message::BlockAnnouncesHandshake};
use sp_runtime::traits::Block as BlockT;

/// Events generated by DHT as a response to get_value and put_value requests.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -90,3 +92,44 @@ pub enum Event {
messages: Vec<(ProtocolName, Bytes)>,
},
}

/// Event sent to `SyncingEngine`
// TODO: remove once `NotificationService` is implemented.
pub enum SyncEvent<B: BlockT> {
/// Opened a substream with the given node with the given notifications protocol.
///
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// Received handshake.
received_handshake: BlockAnnouncesHandshake<B>,
/// Notification sink.
sink: NotificationsSink,
/// Channel for reporting accept/reject of the substream.
tx: oneshot::Sender<bool>,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
},

/// Notification sink was replaced.
NotificationSinkReplaced {
/// Node we closed the substream with.
remote: PeerId,
/// Notification sink.
sink: NotificationsSink,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<Bytes>,
},
}
6 changes: 3 additions & 3 deletions client/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub mod request_responses;
pub mod types;
pub mod utils;

pub use event::{DhtEvent, Event};
pub use event::{DhtEvent, Event, SyncEvent};
#[doc(inline)]
pub use libp2p::{multiaddr, Multiaddr, PeerId};
pub use request_responses::{IfDisconnected, RequestFailure, RequestResponseConfig};
Expand All @@ -278,8 +278,8 @@ pub use service::{
NetworkStatusProvider, NetworkSyncForkRequest, NotificationSender as NotificationSenderT,
NotificationSenderError, NotificationSenderReady,
},
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, OutboundFailure,
PublicKey,
DecodingError, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationsSink,
OutboundFailure, PublicKey,
};
pub use types::ProtocolName;

Expand Down
Loading