Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

//! This module contains the logic for driving Forest forward in the Filecoin
//! blockchain.
//!
Expand All @@ -15,12 +16,14 @@
//!
//! The state machine does not do any network requests or validation. Those are
//! handled by an external actor.

use crate::libp2p::hello::HelloRequest;
use crate::message_pool::MessagePool;
use crate::message_pool::MpoolRpcProvider;
use crate::networks::calculate_expected_epoch;
use crate::shim::clock::ChainEpoch;
use crate::state_manager::StateManager;
use crate::utils::flume::SizeTrackingReceiver;
use crate::utils::misc::env::is_env_truthy;
use ahash::{HashMap, HashSet};
use chrono::Utc;
Expand Down Expand Up @@ -65,7 +68,7 @@ pub struct ChainFollower<DB> {
pub bad_blocks: Option<Arc<BadBlockCache>>,

/// Incoming network events to be handled by synchronizer
net_handler: flume::Receiver<NetworkEvent>,
net_handler: SizeTrackingReceiver<NetworkEvent>,

/// Tipset channel sender
pub tipset_sender: flume::Sender<Arc<FullTipset>>,
Expand All @@ -88,7 +91,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
state_manager: Arc<StateManager<DB>>,
network: SyncNetworkContext<DB>,
genesis: Arc<Tipset>,
net_handler: flume::Receiver<NetworkEvent>,
net_handler: SizeTrackingReceiver<NetworkEvent>,
stateless_mode: bool,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
) -> Self {
Expand Down Expand Up @@ -134,7 +137,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
state_manager: Arc<StateManager<DB>>,
bad_block_cache: Option<Arc<BadBlockCache>>,
network_rx: flume::Receiver<NetworkEvent>,
network_rx: SizeTrackingReceiver<NetworkEvent>,
tipset_receiver: flume::Receiver<Arc<FullTipset>>,
network: SyncNetworkContext<DB>,
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
Expand Down
9 changes: 5 additions & 4 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
rpc::RequestResponseError,
},
utils::{
flume::SizeTrackingSender,
misc::{AdaptiveValueProvider, ExponentialAdaptiveValueProvider},
stats::Stats,
},
Expand Down Expand Up @@ -49,7 +50,7 @@ const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
/// required to make network requests.
pub struct SyncNetworkContext<DB> {
/// Channel to send network messages through P2P service
network_send: flume::Sender<NetworkMessage>,
network_send: SizeTrackingSender<NetworkMessage>,
/// Manages peers to send requests to and updates request stats for the
/// respective peers.
peer_manager: Arc<PeerManager>,
Expand Down Expand Up @@ -119,7 +120,7 @@ where
DB: Blockstore,
{
pub fn new(
network_send: flume::Sender<NetworkMessage>,
network_send: SizeTrackingSender<NetworkMessage>,
peer_manager: Arc<PeerManager>,
db: Arc<DB>,
) -> Self {
Expand All @@ -136,7 +137,7 @@ where
}

/// Returns a reference to the channel for sending network messages through P2P service.
pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
pub fn network_send(&self) -> &SizeTrackingSender<NetworkMessage> {
&self.network_send
}

Expand Down Expand Up @@ -333,7 +334,7 @@ where
/// Send a `chain_exchange` request to the network and await response.
async fn chain_exchange_request(
peer_manager: Arc<PeerManager>,
network_send: flume::Sender<NetworkMessage>,
network_send: SizeTrackingSender<NetworkMessage>,
peer_id: PeerId,
request: ChainExchangeRequest,
) -> Result<ChainExchangeResponse, String> {
Expand Down
9 changes: 9 additions & 0 deletions src/libp2p/hello/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

use crate::shim::bigint::BigInt;
use crate::shim::clock::ChainEpoch;
use crate::utils::get_size::nunny_vec_heap_size_with_fn_helper;
use cid::Cid;
use fvm_ipld_encoding::tuple::*;
use get_size2::GetSize;
use nunny::Vec as NonEmpty;

/// Hello message <https://filecoin-project.github.io/specs/#hello-spec>
Expand All @@ -16,6 +18,13 @@ pub struct HelloRequest {
pub genesis_cid: Cid,
}

impl GetSize for HelloRequest {
fn get_heap_size(&self) -> usize {
nunny_vec_heap_size_with_fn_helper(&self.heaviest_tip_set, |_| 0)
+ self.heaviest_tipset_weight.get_heap_size()
}
}

/// Response to a Hello message. This just handles latency of the peer.
#[derive(Clone, Debug, PartialEq, Eq, Serialize_tuple, Deserialize_tuple)]
pub struct HelloResponse {
Expand Down
83 changes: 50 additions & 33 deletions src/libp2p/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
use super::{
ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
};

use crate::libp2p::{
PeerManager, PeerOperation,
chain_exchange::ChainExchangeBehaviour,
discovery::DiscoveryEvent,
hello::{HelloBehaviour, HelloRequest, HelloResponse},
rpc::RequestResponseError,
};
use crate::utils::flume::{SizeTrackingReceiver, SizeTrackingSender};
use crate::{blocks::GossipBlock, rpc::net::NetInfoResult};
use crate::{chain::ChainStore, utils::encoding::from_slice_with_fallback};
use crate::{
Expand All @@ -21,6 +29,7 @@ use cid::Cid;
use flume::Sender;
use futures::{select, stream::StreamExt as _};
use fvm_ipld_blockstore::Blockstore;
use get_size2::GetSize;
pub use libp2p::gossipsub::{IdentTopic, Topic};
use libp2p::{
PeerId, Swarm, SwarmBuilder,
Expand All @@ -35,22 +44,13 @@ use libp2p::{
swarm::{DialError, SwarmEvent},
tcp, yamux,
};
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use tokio_stream::wrappers::IntervalStream;
use tracing::{debug, error, info, trace, warn};

use super::{
ForestBehaviour, ForestBehaviourEvent, Libp2pConfig,
chain_exchange::{ChainExchangeRequest, ChainExchangeResponse, make_chain_exchange_response},
discovery::{DerivedDiscoveryBehaviourEvent, PeerInfo},
};
use crate::libp2p::{
PeerManager, PeerOperation,
chain_exchange::ChainExchangeBehaviour,
discovery::DiscoveryEvent,
hello::{HelloBehaviour, HelloRequest, HelloResponse},
rpc::RequestResponseError,
};

pub(in crate::libp2p) mod metrics {
use prometheus_client::metrics::{family::Family, gauge::Gauge};
use std::sync::LazyLock;
Expand Down Expand Up @@ -112,6 +112,15 @@ pub enum NetworkEvent {
PeerDisconnected(PeerId),
}

impl GetSize for NetworkEvent {
fn get_heap_size(&self) -> usize {
match self {
Self::HelloResponseOutbound { request, .. } => request.get_heap_size(),
_ => 0,
}
}
}

Comment on lines +115 to +123
Copy link
Contributor

@coderabbitai coderabbitai bot Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Consider expanding GetSize implementation for comprehensive metrics

The current implementation only tracks heap size for HelloResponseOutbound events and returns 0 for all other variants. This might lead to incomplete memory usage metrics.

Consider implementing proper size calculation for other message types that contain heap-allocated data (e.g., PubsubMessage, ChainExchangeRequest, etc.) to get more accurate memory usage metrics.

Also applies to: 160-165


🏁 Script executed:

#!/bin/bash
# Check if NetworkEvent and NetworkMessage variants contain heap-allocated fields that should be tracked
ast-grep --pattern 'enum NetworkEvent {
  $$$
}'

ast-grep --pattern 'enum NetworkMessage {
  $$$
}'

Length of output: 103


🏁 Script executed:

#!/bin/bash
# Locate definitions of NetworkEvent and NetworkMessage for inspection
rg -n "enum NetworkEvent" -A10 -B2
rg -n "enum NetworkMessage" -A10 -B2

Length of output: 1424


🏁 Script executed:

#!/bin/bash
# Inspect GetSize implementation for NetworkMessage
rg -n "impl GetSize for NetworkMessage" -A10 -B5 src/libp2p/service.rs

Length of output: 521


Expand GetSize implementations to cover all heap‐backed variants

To ensure accurate memory‐usage metrics, extend both get_heap_size() impls to account for every variant carrying heap data:

• In src/libp2p/service.rs (impl for NetworkEvent, lines 115–123):

  • Add
    Self::PubsubMessage { message } => message.get_heap_size(),
    before the existing HelloResponseOutbound arm.

• In the same file (impl for NetworkMessage, lines 160–165):

  • Replace the unconditional 0 with a match that handles:
    Self::PubsubMessage { message, .. }         => message.get_heap_size(),
    Self::ChainExchangeRequest { request, .. }  => request.get_heap_size(),
    Self::JSONRPCRequest { method }             => method.get_heap_size(),
    _                                           => 0,

Make sure that PubsubMessage, ChainExchangeRequest, and NetRPCMethods each implement GetSize (or provide custom logic) so their heap allocations are correctly tallied.

🤖 Prompt for AI Agents
In src/libp2p/service.rs around lines 115 to 123, extend the get_heap_size()
implementation for NetworkEvent by adding a match arm for Self::PubsubMessage {
message } that returns message.get_heap_size(), placing it before the existing
HelloResponseOutbound arm. Additionally, in the same file around lines 160 to
165, update the get_heap_size() implementation for NetworkMessage to replace the
unconditional 0 with a match that handles PubsubMessage, ChainExchangeRequest,
and JSONRPCRequest variants by returning their respective get_heap_size()
values, and defaulting to 0 for others. Ensure that PubsubMessage,
ChainExchangeRequest, and NetRPCMethods implement GetSize or have custom logic
to correctly report their heap sizes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, GetSize impl is incomplete. Will fix in upcoming commits

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hanabi1224, sounds good! Thanks for acknowledging the incomplete GetSize implementations. Looking forward to seeing the comprehensive size tracking in the upcoming commits.

  (\   /)
 ( ._. )
o_("_")("_")
*thoughtful about completeness*

🧠 Learnings used
Learnt from: hanabi1224
PR: ChainSafe/forest#5841
File: src/utils/get_size/mod.rs:10-10
Timestamp: 2025-07-17T15:21:40.753Z
Learning: The get_size2 crate's GetSize trait provides default implementations: get_stack_size() uses std::mem::size_of, get_heap_size() returns 0, and get_size() returns their sum. An empty impl like `impl GetSize for MyType {}` is valid and uses these defaults, making it suitable for simple wrapper types that don't allocate heap memory.

/// Message types that can come over `GossipSub`
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -148,6 +157,12 @@ pub enum NetworkMessage {
},
}

impl GetSize for NetworkMessage {
fn get_heap_size(&self) -> usize {
0
}
}

/// Network RPC API methods used to gather data from libp2p node.
#[derive(Debug)]
pub enum NetRPCMethods {
Expand All @@ -170,10 +185,10 @@ pub struct Libp2pService<DB> {
bootstrap_peers: HashMap<PeerId, Multiaddr>,
cs: Arc<ChainStore<DB>>,
peer_manager: Arc<PeerManager>,
network_receiver_in: flume::Receiver<NetworkMessage>,
network_sender_in: Sender<NetworkMessage>,
network_receiver_out: flume::Receiver<NetworkEvent>,
network_sender_out: Sender<NetworkEvent>,
network_receiver_in: SizeTrackingReceiver<NetworkMessage>,
network_sender_in: SizeTrackingSender<NetworkMessage>,
network_receiver_out: SizeTrackingReceiver<NetworkEvent>,
network_sender_out: SizeTrackingSender<NetworkEvent>,
network_name: String,
genesis_cid: Cid,
}
Expand Down Expand Up @@ -223,8 +238,10 @@ where
.with_context(|| format!("Failed to subscribe gossipsub topic {t}"))?;
}

let (network_sender_in, network_receiver_in) = flume::unbounded();
let (network_sender_out, network_receiver_out) = flume::unbounded();
let (network_sender_in, network_receiver_in) =
crate::utils::flume::unbounded_with_default_metrics_registry("network_messages".into());
let (network_sender_out, network_receiver_out) =
crate::utils::flume::unbounded_with_default_metrics_registry("network_events".into());

// Hint at the multihash which has to go in the `/p2p/<multihash>` part of the
// peer's multiaddress. Useful if others want to use this node to bootstrap
Expand Down Expand Up @@ -288,7 +305,7 @@ where

let bitswap_request_manager = self.swarm.behaviour().bitswap.request_manager();
let mut swarm_stream = self.swarm.fuse();
let mut network_stream = self.network_receiver_in.stream().fuse();
let mut network_stream = self.network_receiver_in.stream();
let mut interval =
IntervalStream::new(tokio::time::interval(Duration::from_secs(15))).fuse();
let pubsub_block_str = format!("{}/{}", PUBSUB_BLOCK_STR, self.network_name);
Expand Down Expand Up @@ -382,12 +399,12 @@ where
}

/// Returns a sender which allows sending messages to the libp2p service.
pub fn network_sender(&self) -> Sender<NetworkMessage> {
pub fn network_sender(&self) -> SizeTrackingSender<NetworkMessage> {
self.network_sender_in.clone()
}

/// Returns a receiver to listen to network events emitted from the service.
pub fn network_receiver(&self) -> flume::Receiver<NetworkEvent> {
pub fn network_receiver(&self) -> SizeTrackingReceiver<NetworkEvent> {
self.network_receiver_out.clone()
}

Expand Down Expand Up @@ -441,7 +458,7 @@ async fn handle_network_message(
store: Arc<impl BitswapStoreReadWrite>,
bitswap_request_manager: Arc<BitswapRequestManager>,
message: NetworkMessage,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
peer_manager: &Arc<PeerManager>,
) {
match message {
Expand Down Expand Up @@ -580,7 +597,7 @@ async fn handle_network_message(
async fn handle_discovery_event(
peer_info_map: &HashMap<PeerId, PeerInfo>,
discovery_out: DiscoveryEvent,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
peer_manager: &PeerManager,
) {
match discovery_out {
Expand Down Expand Up @@ -625,7 +642,7 @@ async fn handle_discovery_event(

async fn handle_gossip_event(
e: gossipsub::Event,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
pubsub_block_str: &str,
pubsub_msg_str: &str,
) {
Expand Down Expand Up @@ -680,7 +697,7 @@ async fn handle_hello_event(
event: request_response::Event<HelloRequest, HelloResponse, HelloResponse>,
peer_manager: &PeerManager,
genesis_cid: &Cid,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
) {
match event {
request_response::Event::Message { peer, message, .. } => match message {
Expand Down Expand Up @@ -790,7 +807,7 @@ async fn handle_chain_exchange_event<DB>(
chain_exchange: &mut ChainExchangeBehaviour,
ce_event: request_response::Event<ChainExchangeRequest, ChainExchangeResponse>,
db: &Arc<ChainStore<DB>>,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
cx_response_tx: Sender<(
request_response::InboundRequestId,
request_response::ResponseChannel<ChainExchangeResponse>,
Expand Down Expand Up @@ -869,7 +886,7 @@ async fn handle_forest_behaviour_event<DB>(
event: ForestBehaviourEvent,
db: &Arc<ChainStore<DB>>,
genesis_cid: &Cid,
network_sender_out: &Sender<NetworkEvent>,
network_sender_out: &SizeTrackingSender<NetworkEvent>,
cx_response_tx: Sender<(
request_response::InboundRequestId,
request_response::ResponseChannel<ChainExchangeResponse>,
Expand Down Expand Up @@ -930,7 +947,7 @@ async fn handle_forest_behaviour_event<DB>(
}
}

async fn emit_event(sender: &Sender<NetworkEvent>, event: NetworkEvent) {
async fn emit_event(sender: &SizeTrackingSender<NetworkEvent>, event: NetworkEvent) {
if sender.send_async(event).await.is_err() {
error!("Failed to emit event: Network channel receiver has been dropped");
}
Expand Down
14 changes: 7 additions & 7 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use crate::libp2p::{NetworkMessage, PUBSUB_MSG_STR, Topic};
use crate::message::{Message as MessageTrait, SignedMessage};
use crate::networks::ChainConfig;
use crate::shim::{address::Address, crypto::Signature};
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use crate::utils::{cache::SizeTrackingLruCache, flume::SizeTrackingSender, get_size::CidWrapper};
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use fvm_ipld_encoding::to_vec;
Expand Down Expand Up @@ -55,7 +54,7 @@ where
#[allow(clippy::too_many_arguments)]
async fn republish_pending_messages<T>(
api: &T,
network_sender: &flume::Sender<NetworkMessage>,
network_sender: &SizeTrackingSender<NetworkMessage>,
pending: &SyncRwLock<HashMap<Address, MsgSet>>,
cur_tipset: &SyncRwLock<Arc<Tipset>>,
republished: &SyncRwLock<HashSet<Cid>>,
Expand Down Expand Up @@ -335,6 +334,7 @@ pub mod tests {
econ::TokenAmount,
message::{Message, Message_v3},
};
use crate::utils::flume::bounded_with_default_metrics_registry;
use num_traits::Zero;
use test_provider::*;
use tokio::task::JoinSet;
Expand All @@ -354,7 +354,7 @@ pub mod tests {
let tma = TestApi::with_max_actor_pending_messages(200);
tma.set_state_sequence(&sender, 0);

let (tx, _rx) = flume::bounded(50);
let (tx, _rx) = bounded_with_default_metrics_registry(50, "network_messages".into());
let mut services = JoinSet::new();
let mpool =
MessagePool::new(tma, tx, Default::default(), Arc::default(), &mut services).unwrap();
Expand Down Expand Up @@ -434,7 +434,7 @@ pub mod tests {
let tma = TestApi::default();
tma.set_state_sequence(&sender, 0);

let (tx, _rx) = flume::bounded(50);
let (tx, _rx) = bounded_with_default_metrics_registry(50, "network_messages".into());
let mut services = JoinSet::new();
let mpool =
MessagePool::new(tma, tx, Default::default(), Arc::default(), &mut services).unwrap();
Expand Down Expand Up @@ -495,7 +495,7 @@ pub mod tests {
let msg = create_smsg(&target, &sender, wallet.borrow_mut(), i, 1000000, 1);
smsg_vec.push(msg);
}
let (tx, _rx) = flume::bounded(50);
let (tx, _rx) = bounded_with_default_metrics_registry(50, "network_messages".into());
let mut services = JoinSet::new();
let mpool =
MessagePool::new(tma, tx, Default::default(), Arc::default(), &mut services).unwrap();
Expand Down Expand Up @@ -588,7 +588,7 @@ pub mod tests {

let tma = TestApi::default();
tma.set_state_sequence(&sender, 0);
let (tx, _rx) = flume::bounded(50);
let (tx, _rx) = bounded_with_default_metrics_registry(50, "network_messages".into());
let mut services = JoinSet::new();
let mpool =
MessagePool::new(tma, tx, Default::default(), Arc::default(), &mut services).unwrap();
Expand Down
Loading
Loading