diff --git a/beacon_node/lighthouse_network/src/metrics.rs b/beacon_node/lighthouse_network/src/metrics.rs index 15445c7d64..40661f3a1a 100644 --- a/beacon_node/lighthouse_network/src/metrics.rs +++ b/beacon_node/lighthouse_network/src/metrics.rs @@ -122,9 +122,64 @@ pub static TOTAL_RPC_ERRORS_PER_CLIENT: LazyLock> = LazyLo &["client", "rpc_error", "direction"], ) }); -pub static TOTAL_RPC_REQUESTS: LazyLock> = LazyLock::new(|| { - try_create_int_counter_vec("libp2p_rpc_requests_total", "RPC requests total", &["type"]) +pub static TOTAL_RPC_REQUESTS_SENT: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_requests_sent_total", + "RPC requests sent total", + &["type"], + ) +}); +pub static TOTAL_RPC_REQUESTS_BYTES_SENT: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_requests_bytes_sent_total", + "RPC requests bytes sent total", + &["type"], + ) +}); +pub static TOTAL_RPC_REQUESTS_RECEIVED: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_requests_received_total", + "RPC requests received total", + &["type"], + ) }); +pub static TOTAL_RPC_REQUESTS_BYTES_RECEIVED: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_requests_bytes_received_total", + "RPC requests bytes received total", + &["type"], + ) + }); +pub static TOTAL_RPC_RESPONSES_SENT: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_responses_sent_total", + "RPC responses sent total", + &["type"], + ) +}); +pub static TOTAL_RPC_RESPONSES_BYTES_SENT: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_responses_bytes_sent_total", + "RPC responses bytes sent total", + &["type"], + ) +}); +pub static TOTAL_RPC_RESPONSES_RECEIVED: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_responses_received_total", + "RPC responses received total", + &["type"], + ) +}); +pub static TOTAL_RPC_RESPONSES_BYTES_RECEIVED: LazyLock> = + LazyLock::new(|| { + try_create_int_counter_vec( + "libp2p_rpc_responses_bytes_received_total", + "RPC responses bytes received total", + &["type"], + ) + }); pub static PEER_ACTION_EVENTS_PER_CLIENT: LazyLock> = LazyLock::new(|| { try_create_int_counter_vec( "libp2p_peer_actions_per_client", diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 5d86936d41..1984fc2301 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -326,7 +326,7 @@ impl Encoder> for SSZSnappyOutboundCodec { type Error = RPCError; fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> { - let bytes = match item { + let bytes = match &item { RequestType::Status(req) => req.as_ssz_bytes(), RequestType::Goodbye(req) => req.as_ssz_bytes(), RequestType::BlocksByRange(r) => match r { @@ -369,6 +369,11 @@ impl Encoder> for SSZSnappyOutboundCodec { // Write compressed bytes to `dst` dst.extend_from_slice(writer.get_ref()); + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_SENT, + &[item.into()], + dst.len() as u64, + ); Ok(()) } } @@ -555,38 +560,86 @@ fn handle_rpc_request( spec: &ChainSpec, ) -> Result>, RPCError> { match versioned_protocol { - SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status( + SupportedProtocol::StatusV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["status"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::Status( StatusMessage::from_ssz_bytes(decoded_buffer)?, - ))), + )?)) + } SupportedProtocol::GoodbyeV1 => Ok(Some(RequestType::Goodbye( GoodbyeReason::from_ssz_bytes(decoded_buffer)?, ))), - SupportedProtocol::BlocksByRangeV2 => Ok(Some(RequestType::BlocksByRange( + SupportedProtocol::BlocksByRangeV2 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blocks_by_range"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::BlocksByRange( OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2::from_ssz_bytes(decoded_buffer)?), - ))), - SupportedProtocol::BlocksByRangeV1 => Ok(Some(RequestType::BlocksByRange( + )?), + ) + } + SupportedProtocol::BlocksByRangeV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blocks_by_range"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::BlocksByRange( OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1::from_ssz_bytes(decoded_buffer)?), - ))), - SupportedProtocol::BlocksByRootV2 => Ok(Some(RequestType::BlocksByRoot( + )?), + ) + } + SupportedProtocol::BlocksByRootV2 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blocks_by_root"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::BlocksByRoot( BlocksByRootRequest::V2(BlocksByRootRequestV2 { block_roots: RuntimeVariableList::from_ssz_bytes( decoded_buffer, spec.max_request_blocks as usize, )?, - }), - ))), - SupportedProtocol::BlocksByRootV1 => Ok(Some(RequestType::BlocksByRoot( + })))) + } + SupportedProtocol::BlocksByRootV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blocks_by_root"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::BlocksByRoot( BlocksByRootRequest::V1(BlocksByRootRequestV1 { block_roots: RuntimeVariableList::from_ssz_bytes( decoded_buffer, spec.max_request_blocks as usize, )?, - }), - ))), - SupportedProtocol::BlobsByRangeV1 => Ok(Some(RequestType::BlobsByRange( + }, + )))) + } + SupportedProtocol::BlobsByRangeV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blobs_by_range"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::BlobsByRange( BlobsByRangeRequest::from_ssz_bytes(decoded_buffer)?, - ))), + ))) + } SupportedProtocol::BlobsByRootV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["blobs_by_root"], + decoded_buffer.len() as u64, + ); Ok(Some(RequestType::BlobsByRoot(BlobsByRootRequest { blob_ids: RuntimeVariableList::from_ssz_bytes( decoded_buffer, @@ -594,29 +647,60 @@ fn handle_rpc_request( )?, }))) } - SupportedProtocol::DataColumnsByRangeV1 => Ok(Some(RequestType::DataColumnsByRange( + SupportedProtocol::DataColumnsByRangeV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["data_columns_by_range"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::DataColumnsByRange( DataColumnsByRangeRequest::from_ssz_bytes(decoded_buffer)?, - ))), - SupportedProtocol::DataColumnsByRootV1 => Ok(Some(RequestType::DataColumnsByRoot( + )))) + } + SupportedProtocol::DataColumnsByRootV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["data_columns_by_root"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::DataColumnsByRoot( DataColumnsByRootRequest { data_column_ids: RuntimeVariableList::from_ssz_bytes( decoded_buffer, spec.max_request_data_column_sidecars as usize, )?, }, - ))), + ))) + } SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), - SupportedProtocol::LightClientBootstrapV1 => Ok(Some(RequestType::LightClientBootstrap( + SupportedProtocol::LightClientBootstrapV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["light_client_bootstrap"], + decoded_buffer.len() as u64, + ); + Ok(Some(RequestType::LightClientBootstrap( LightClientBootstrapRequest { root: Hash256::from_ssz_bytes(decoded_buffer)?, }, - ))), + ))) + } SupportedProtocol::LightClientOptimisticUpdateV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["light_client_optimistic_update"], + decoded_buffer.len() as u64, + ); Ok(Some(RequestType::LightClientOptimisticUpdate)) } SupportedProtocol::LightClientFinalityUpdateV1 => { + metrics::inc_counter_vec_by( + &metrics::TOTAL_RPC_REQUESTS_BYTES_RECEIVED, + &["light_client_finality_update"], + decoded_buffer.len() as u64, + ); Ok(Some(RequestType::LightClientFinalityUpdate)) } SupportedProtocol::LightClientUpdatesByRangeV1 => { @@ -658,7 +742,7 @@ fn handle_rpc_request( /// `decoded_buffer` should be an ssz-encoded bytestream with /// length = length-prefix received in the beginning of the stream. /// -/// For BlocksByRange/BlocksByRoot reponses, decodes the appropriate response +/// For BlocksByRange/BlocksByRoot responses, decodes the appropriate response /// according to the received `ForkName`. fn handle_rpc_response( versioned_protocol: SupportedProtocol, diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index b614313a84..93615de628 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -8,6 +8,7 @@ use futures::prelude::{AsyncRead, AsyncWrite}; use futures::{FutureExt, SinkExt}; use libp2p::core::{OutboundUpgrade, UpgradeInfo}; use std::sync::Arc; +use strum::IntoStaticStr; use tokio_util::{ codec::Framed, compat::{Compat, FuturesAsyncReadCompatExt}, @@ -25,6 +26,20 @@ pub struct OutboundRequestContainer { pub max_rpc_size: usize, } +#[derive(Debug, Clone, PartialEq, IntoStaticStr)] +pub enum OutboundRequest { + Status(StatusMessage), + Goodbye(GoodbyeReason), + BlocksByRange(OldBlocksByRangeRequest), + BlocksByRoot(BlocksByRootRequest), + BlobsByRange(BlobsByRangeRequest), + BlobsByRoot(BlobsByRootRequest), + DataColumnsByRoot(DataColumnsByRootRequest), + DataColumnsByRange(DataColumnsByRangeRequest), + Ping(Ping), + MetaData(MetadataRequest), +} + impl UpgradeInfo for OutboundRequestContainer { type Info = ProtocolId; type InfoIter = Vec; diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index cb22815390..379d2eef09 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; +use strum::IntoStaticStr; use types::{ BlobSidecar, DataColumnSidecar, EthSpec, Hash256, LightClientBootstrap, LightClientFinalityUpdate, LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, @@ -89,13 +90,76 @@ pub enum RequestId { Internal, } +/// The type of RPC requests the Behaviour informs it has received and allows for sending. +/// +// NOTE: This is an application-level wrapper over the lower network level requests that can be +// sent. The main difference is the absence of the Ping, Metadata and Goodbye protocols, which don't +// leave the Behaviour. For all protocols managed by RPC see `RPCRequest`. +#[derive(Debug, Clone, PartialEq, IntoStaticStr)] +pub enum Request { + /// A Status message. + Status(StatusMessage), + /// A blocks by range request. + BlocksByRange(BlocksByRangeRequest), + /// A blobs by range request. + BlobsByRange(BlobsByRangeRequest), + /// A request blocks root request. + BlocksByRoot(BlocksByRootRequest), + // light client bootstrap request + LightClientBootstrap(LightClientBootstrapRequest), + // light client optimistic update request + LightClientOptimisticUpdate, + // light client finality update request + LightClientFinalityUpdate, + /// A request blobs root request. + BlobsByRoot(BlobsByRootRequest), + /// A request data columns root request. + DataColumnsByRoot(DataColumnsByRootRequest), + /// A request data columns by range request. + DataColumnsByRange(DataColumnsByRangeRequest), +} + +impl std::convert::From for OutboundRequest { + fn from(req: Request) -> OutboundRequest { + match req { + Request::BlocksByRoot(r) => OutboundRequest::BlocksByRoot(r), + Request::BlocksByRange(r) => match r { + BlocksByRangeRequest::V1(req) => OutboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V1(OldBlocksByRangeRequestV1 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }), + ), + BlocksByRangeRequest::V2(req) => OutboundRequest::BlocksByRange( + OldBlocksByRangeRequest::V2(OldBlocksByRangeRequestV2 { + start_slot: req.start_slot, + count: req.count, + step: 1, + }), + ), + }, + Request::LightClientBootstrap(_) + | Request::LightClientOptimisticUpdate + | Request::LightClientFinalityUpdate => { + unreachable!("Lighthouse never makes an outbound light client request") + } + Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), + Request::BlobsByRoot(r) => OutboundRequest::BlobsByRoot(r), + Request::DataColumnsByRoot(r) => OutboundRequest::DataColumnsByRoot(r), + Request::DataColumnsByRange(r) => OutboundRequest::DataColumnsByRange(r), + Request::Status(s) => OutboundRequest::Status(s), + } + } +} + /// The type of RPC responses the Behaviour informs it has received, and allows for sending. /// // NOTE: This is an application-level wrapper over the lower network level responses that can be // sent. The main difference is the absense of Pong and Metadata, which don't leave the // Behaviour. For all protocol reponses managed by RPC see `RPCResponse` and // `RPCCodedResponse`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, IntoStaticStr)] pub enum Response { /// A Status message. Status(StatusMessage), diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index b23e417adb..b030495e17 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -970,8 +970,12 @@ impl Network { return Err((request_id, RPCError::Disconnected)); } - self.eth2_rpc_mut() - .send_request(peer_id, RequestId::Application(request_id), request); + self.eth2_rpc_mut().send_request( + peer_id, + RequestId::Application(request_id), + request.clone().into(), + ); + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS_SENT, &[request.into()]); Ok(()) } @@ -1207,6 +1211,26 @@ impl Network { } } + /// Convenience function to propagate a request. + #[must_use = "actually return the event"] + fn build_request( + &mut self, + id: PeerRequestId, + peer_id: PeerId, + request: Request, + ) -> NetworkEvent { + // Increment metrics + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS_RECEIVED, + &[request.clone().into()], + ); + NetworkEvent::RequestReceived { + peer_id, + id, + request, + } + } + /// Dial cached Enrs in discovery service that are in the given `subnet_id` and aren't /// in Connected, Dialing or Banned state. fn dial_cached_enrs_in_subnet(&mut self, subnet: Subnet, spec: Arc) {