-
Notifications
You must be signed in to change notification settings - Fork 180
feat: size tracking flume channel #5857
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis change replaces all uses of standard flume channels for network messages and events with custom size-tracking wrappers ( Changes
Sequence Diagram(s)sequenceDiagram
participant Sender as SizeTrackingSender
participant Receiver as SizeTrackingReceiver
participant Metrics as PrometheusRegistry
Sender->>Receiver: send(msg: T)
Note right of Sender: Increments queued/total counters<br>by msg.get_heap_size()
Receiver->>Sender: recv()
Note left of Receiver: Decrements queued counters<br>by msg.get_heap_size()
Metrics-->>Sender: Collects channel metrics
Metrics-->>Receiver: Collects channel metrics
Estimated code review effort4 (~75–90 minutes) Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/utils/flume/mod.rs (1)
233-237
: Fix typo in metric help text"Qeueue" should be "Queued".
let size_metric_help = format!( - "Qeueue message size of flume channel {}_{} in bytes", + "Queued message size of flume channel {}_{} in bytes", self.name, self.id );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
src/chain_sync/chain_follower.rs
(5 hunks)src/chain_sync/network_context.rs
(5 hunks)src/libp2p/hello/message.rs
(1 hunks)src/libp2p/service.rs
(16 hunks)src/message_pool/msgpool/mod.rs
(7 hunks)src/message_pool/msgpool/msg_pool.rs
(3 hunks)src/message_pool/msgpool/selection.rs
(2 hunks)src/rpc/methods/sync.rs
(1 hunks)src/rpc/mod.rs
(2 hunks)src/shim/bigint.rs
(1 hunks)src/tool/offline_server/server.rs
(2 hunks)src/tool/subcommands/api_cmd/generate_test_snapshot.rs
(2 hunks)src/tool/subcommands/api_cmd/test_snapshot.rs
(2 hunks)src/utils/flume/mod.rs
(2 hunks)src/utils/get_size/mod.rs
(1 hunks)
🧬 Code Graph Analysis (5)
src/libp2p/hello/message.rs (1)
src/utils/get_size/mod.rs (1)
vec_alike_get_size
(8-13)
src/message_pool/msgpool/selection.rs (1)
src/utils/flume/mod.rs (1)
bounded_with_default_metrics_registry
(52-57)
src/tool/subcommands/api_cmd/generate_test_snapshot.rs (2)
src/utils/flume/mod.rs (1)
bounded_with_default_metrics_registry
(52-57)src/chain_sync/network_context.rs (1)
network_send
(140-142)
src/message_pool/msgpool/msg_pool.rs (1)
src/libp2p/service.rs (1)
network_sender
(403-405)
src/utils/flume/mod.rs (3)
src/message_pool/msgpool/msg_pool.rs (3)
flume
(486-486)new
(71-76)new
(461-581)src/metrics/mod.rs (1)
default_registry
(26-28)src/utils/cache/lru.rs (1)
size_in_bytes
(124-132)
🧰 Additional context used
🧬 Code Graph Analysis (5)
src/libp2p/hello/message.rs (1)
src/utils/get_size/mod.rs (1)
vec_alike_get_size
(8-13)
src/message_pool/msgpool/selection.rs (1)
src/utils/flume/mod.rs (1)
bounded_with_default_metrics_registry
(52-57)
src/tool/subcommands/api_cmd/generate_test_snapshot.rs (2)
src/utils/flume/mod.rs (1)
bounded_with_default_metrics_registry
(52-57)src/chain_sync/network_context.rs (1)
network_send
(140-142)
src/message_pool/msgpool/msg_pool.rs (1)
src/libp2p/service.rs (1)
network_sender
(403-405)
src/utils/flume/mod.rs (3)
src/message_pool/msgpool/msg_pool.rs (3)
flume
(486-486)new
(71-76)new
(461-581)src/metrics/mod.rs (1)
default_registry
(26-28)src/utils/cache/lru.rs (1)
size_in_bytes
(124-132)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Build forest binaries on Linux AMD64
- GitHub Check: tests-release
- GitHub Check: tests
- GitHub Check: Build Ubuntu
- GitHub Check: cargo-publish-dry-run
- GitHub Check: All lint checks
🔇 Additional comments (26)
src/tool/offline_server/server.rs (1)
92-92
: LGTM: Channel replaced with metrics-enabled variantThe replacement of
flume::bounded(5)
withbounded_with_default_metrics_registry(5, "network_message".into())
correctly integrates Prometheus metrics tracking for the network message channel. This aligns with the PR's objective of adding size and count tracking for flume channels.src/utils/get_size/mod.rs (1)
8-13
: LGTM: Well-designed generic utility functionThe
vec_alike_get_size
function is elegantly implemented with appropriate generic bounds. UsingAsRef<[T]>
allows it to work with various slice-like types, andstd::mem::size_of_val
provides accurate byte size calculation.src/message_pool/msgpool/selection.rs (1)
896-896
: LGTM: Test updated to use metrics-enabled channelsThe test function correctly adopts the new size-tracking channel creation pattern, replacing
flume::bounded(50)
withbounded_with_default_metrics_registry(50, "network_messages".into())
. This ensures test consistency with the production code changes.src/shim/bigint.rs (1)
13-17
: LGTM: Accurate heap size calculation for BigIntThe
GetSize
implementation correctly calculates the heap size by determining the number of bits and converting to bytes usingdiv_ceil(8)
. This provides accurate memory usage tracking for big integer values in the size-tracking system.src/rpc/methods/sync.rs (1)
178-183
: LGTM: Test helper updated to use size-tracking channelsThe
ctx()
function correctly adopts the new size-tracking channel pattern. The return type is properly updated toSizeTrackingReceiver<NetworkMessage>
and the channel creation usesbounded_with_default_metrics_registry
, maintaining test consistency with production code changes.src/libp2p/hello/message.rs (4)
6-8
: LGTM! Clean imports for size tracking functionality.The imports are well-organized and the alias
vec_alike_get_heap_size
clearly indicates the intent of measuring heap size for vector-like structures.
13-13
: Excellent addition of size tracking capability.Deriving
GetSize
forHelloRequest
enables precise heap size measurement, which is crucial for the network message size tracking being implemented across the codebase.
15-16
: Proper use of custom size function for vector-like field.The
vec_alike_get_heap_size
function is correctly applied to theheaviest_tip_set
field, ensuring accurate heap size measurement for theNonEmpty<Cid>
collection.
19-20
: Smart optimization by excluding fixed-size identifier.Ignoring the
genesis_cid
field from size calculations is a reasonable optimization sinceCid
is a small, fixed-size identifier that doesn't contribute significantly to heap usage.src/rpc/mod.rs (2)
16-16
: Appropriate import for size-tracking functionality.The addition of
SizeTrackingSender
import aligns with the systematic upgrade to size-tracking channels throughout the codebase.
480-482
: Consistent upgrade to size-tracking sender.The return type change to
SizeTrackingSender<NetworkMessage>
is part of the systematic refactor to enable network message size tracking in RPC operations, maintaining the same interface while adding metrics capabilities.src/message_pool/msgpool/mod.rs (4)
19-19
: Clean import addition for size tracking.The addition of
SizeTrackingSender
to the existing utils import is appropriate and maintains clean import organization.
337-337
: Appropriate test utility import.The import of
bounded_with_default_metrics_registry
enables the test code to use the new size-tracking channel constructor consistently with the production code.
57-57
: Appropriate parameter type upgrade.The change from
flume::Sender
toSizeTrackingSender<NetworkMessage>
enables size tracking for message republishing operations while maintaining the same functional interface.
357-357
: Consistent test updates for size-tracking channels.The replacement of
flume::bounded
withbounded_with_default_metrics_registry
in tests ensures consistency between test and production code, maintaining the same capacity while adding size tracking capabilities.Also applies to: 437-437, 498-498, 591-591
src/tool/subcommands/api_cmd/test_snapshot.rs (3)
5-5
: Clean imports for size-tracking functionality.The imports for
SizeTrackingReceiver
andbounded_with_default_metrics_registry
are appropriate and necessary for the updated channel functionality.
133-133
: Appropriate return type upgrade for size tracking.The change to
SizeTrackingReceiver<NetworkMessage>
enables size tracking capabilities while maintaining the same functional interface.
136-137
: Consistent channel creation with metrics tracking.The use of
bounded_with_default_metrics_registry
with capacity 5 and label "network_messages" appropriately adds size tracking while maintaining the existing functionality.src/tool/subcommands/api_cmd/generate_test_snapshot.rs (3)
6-6
: Consistent imports matching refactoring pattern.The imports for size-tracking functionality are consistent with the pattern established across the codebase, ensuring uniform adoption of the new channel types.
102-102
: Consistent return type upgrade across tool subcommands.The change to
SizeTrackingReceiver<NetworkMessage>
maintains consistency with similar functions across the codebase while adding size tracking capabilities.
105-106
: Systematic completion of size-tracking channel adoption.The channel creation using
bounded_with_default_metrics_registry
with consistent parameters completes the systematic upgrade to size-tracking channels across tool subcommands.src/chain_sync/chain_follower.rs (1)
3-3
: LGTM!The migration from
flume::Receiver<NetworkEvent>
toSizeTrackingReceiver<NetworkEvent>
is correctly implemented across all relevant locations.Also applies to: 19-19, 26-26, 71-71, 94-94, 140-140
src/chain_sync/network_context.rs (1)
25-25
: LGTM!The migration from
flume::Sender<NetworkMessage>
toSizeTrackingSender<NetworkMessage>
is correctly implemented throughout the module.Also applies to: 53-53, 123-123, 140-141, 337-337
src/message_pool/msgpool/msg_pool.rs (1)
26-26
: LGTM!The migration to
SizeTrackingSender<NetworkMessage>
is properly implemented with correct import statements and type updates.Also applies to: 183-183, 463-463
src/utils/flume/mod.rs (1)
1-314
: Well-designed size-tracking wrapper implementationThe implementation provides comprehensive metrics tracking for flume channels with:
- Thread-safe atomic counters for message counts and sizes
- Proper Prometheus metrics integration with unique channel identification
- Preservation of flume's API compatibility
- Support for both bounded and unbounded channels
- Correct handling of sync/async operations and streaming
src/libp2p/service.rs (1)
4-16
: LGTM! Comprehensive migration to size-tracking channelsAll flume channel types have been consistently replaced with their size-tracking variants throughout the service, including:
- Struct fields
- Method signatures
- Channel creation with metrics registry integration
- Stream creation using the size-tracking receiver
Also applies to: 32-32, 188-191, 241-244, 309-309, 403-404, 408-409
impl GetSize for NetworkEvent { | ||
fn get_heap_size(&self) -> usize { | ||
match self { | ||
Self::HelloResponseOutbound { request, .. } => request.get_heap_size(), | ||
_ => 0, | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Consider expanding GetSize
implementation for comprehensive metrics
The current implementation only tracks heap size for HelloResponseOutbound
events and returns 0 for all other variants. This might lead to incomplete memory usage metrics.
Consider implementing proper size calculation for other message types that contain heap-allocated data (e.g., PubsubMessage
, ChainExchangeRequest
, etc.) to get more accurate memory usage metrics.
Also applies to: 160-165
🏁 Script executed:
#!/bin/bash
# Check if NetworkEvent and NetworkMessage variants contain heap-allocated fields that should be tracked
ast-grep --pattern 'enum NetworkEvent {
$$$
}'
ast-grep --pattern 'enum NetworkMessage {
$$$
}'
Length of output: 103
🏁 Script executed:
#!/bin/bash
# Locate definitions of NetworkEvent and NetworkMessage for inspection
rg -n "enum NetworkEvent" -A10 -B2
rg -n "enum NetworkMessage" -A10 -B2
Length of output: 1424
🏁 Script executed:
#!/bin/bash
# Inspect GetSize implementation for NetworkMessage
rg -n "impl GetSize for NetworkMessage" -A10 -B5 src/libp2p/service.rs
Length of output: 521
Expand GetSize
implementations to cover all heap‐backed variants
To ensure accurate memory‐usage metrics, extend both get_heap_size()
impls to account for every variant carrying heap data:
• In src/libp2p/service.rs
(impl for NetworkEvent
, lines 115–123):
- Add
before the existing
Self::PubsubMessage { message } => message.get_heap_size(),
HelloResponseOutbound
arm.
• In the same file (impl for NetworkMessage
, lines 160–165):
- Replace the unconditional
0
with amatch
that handles:Self::PubsubMessage { message, .. } => message.get_heap_size(), Self::ChainExchangeRequest { request, .. } => request.get_heap_size(), Self::JSONRPCRequest { method } => method.get_heap_size(), _ => 0,
Make sure that PubsubMessage
, ChainExchangeRequest
, and NetRPCMethods
each implement GetSize
(or provide custom logic) so their heap allocations are correctly tallied.
🤖 Prompt for AI Agents
In src/libp2p/service.rs around lines 115 to 123, extend the get_heap_size()
implementation for NetworkEvent by adding a match arm for Self::PubsubMessage {
message } that returns message.get_heap_size(), placing it before the existing
HelloResponseOutbound arm. Additionally, in the same file around lines 160 to
165, update the get_heap_size() implementation for NetworkMessage to replace the
unconditional 0 with a match that handles PubsubMessage, ChainExchangeRequest,
and JSONRPCRequest variants by returning their respective get_heap_size()
values, and defaulting to 0 for others. Ensure that PubsubMessage,
ChainExchangeRequest, and NetRPCMethods implement GetSize or have custom logic
to correctly report their heap sizes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, GetSize impl is incomplete. Will fix in upcoming commits
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hanabi1224, sounds good! Thanks for acknowledging the incomplete GetSize
implementations. Looking forward to seeing the comprehensive size tracking in the upcoming commits.
(\ /)
( ._. )
o_("_")("_")
*thoughtful about completeness*
🧠 Learnings used
Learnt from: hanabi1224
PR: ChainSafe/forest#5841
File: src/utils/get_size/mod.rs:10-10
Timestamp: 2025-07-17T15:21:40.753Z
Learning: The get_size2 crate's GetSize trait provides default implementations: get_stack_size() uses std::mem::size_of, get_heap_size() returns 0, and get_size() returns their sum. An empty impl like `impl GetSize for MyType {}` is valid and uses these defaults, making it suitable for simple wrapper types that don't allocate heap memory.
) | ||
} | ||
|
||
#[derive(Debug)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great to put some docs around this (even with doc examples), especially given its ubiquitous usage across the codebase. Same goes for other introduces SizeTracking*
data structures.
I'd also consider putting them (flume, cache and I think there was one more) in a dedicated module, i.e., src/utils/size_tracking/*
. This could be done in a separate PR if you decide it makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tests?
src/utils/get_size/mod.rs
Outdated
pub fn vec_alike_get_size<V, T>(slice: &V) -> usize | ||
where | ||
V: AsRef<[T]>, | ||
{ | ||
std::mem::size_of_val(slice.as_ref()) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you help me understand the correctness of this? This seems to work at most with shallow collections, but something like this will totally not work:
@hanabi1224 Should we close this draft PR? |
Summary of changes
Changes introduced in this pull request:
Reference issue to close (if applicable)
Closes
Other information and links
Change checklist
Summary by CodeRabbit
New Features
Refactor
Tests