Skip to content

Commit 678c9c2

Browse files
committed
ckb_network: prevent EventHandler::handle_error removed user configured public_addrs
Signed-off-by: Eval EXEC <[email protected]>
1 parent 8444466 commit 678c9c2

File tree

1 file changed

+65
-12
lines changed

1 file changed

+65
-12
lines changed

network/src/network.rs

+65-12
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
2828
use ckb_util::{Condvar, Mutex, RwLock};
2929
use futures::{channel::mpsc::Sender, Future};
3030
use ipnetwork::IpNetwork;
31+
use p2p::multiaddr::MultiAddr;
3132
use p2p::{
3233
async_trait,
3334
builder::ServiceBuilder,
@@ -68,6 +69,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
6869
// After 5 minutes we consider this dial hang
6970
const DIAL_HANG_TIMEOUT: Duration = Duration::from_secs(300);
7071

72+
/// CKB node's public addresses:
73+
///
74+
/// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
75+
pub struct PublicAddresses {
76+
/// Addresses explicitly configured by the user in the ckb.toml configuration file.
77+
/// These addresses are considered static and represent the node's intended public endpoints.
78+
configured: HashSet<MultiAddr>,
79+
80+
/// Addresses discovered dynamically at runtime through observing successful outbound connections.
81+
/// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
82+
/// read access while providing exclusive write access for updates. Addresses that fail to connect
83+
/// are removed from this set.
84+
discovered: RwLock<HashSet<Multiaddr>>,
85+
}
86+
87+
impl PublicAddresses {
88+
fn new(configured: HashSet<MultiAddr>, discovered: HashSet<Multiaddr>) -> Self {
89+
Self {
90+
configured,
91+
discovered: RwLock::new(discovered),
92+
}
93+
}
94+
95+
fn all(&self) -> Vec<MultiAddr> {
96+
self.configured
97+
.iter()
98+
.chain(self.discovered.read().iter())
99+
.cloned()
100+
.collect()
101+
}
102+
103+
fn contains(&self, addr: &MultiAddr) -> bool {
104+
self.discovered.read().contains(addr) || self.configured.contains(addr)
105+
}
106+
107+
fn count(&self) -> usize {
108+
self.configured.len() + self.discovered.read().len()
109+
}
110+
111+
fn random_choose(&self) -> Option<MultiAddr> {
112+
let addrs = self.all();
113+
if addrs.is_empty() {
114+
return None;
115+
}
116+
addrs.into_iter().choose(&mut rand::thread_rng())
117+
}
118+
}
119+
71120
/// The global shared state of the network module
72121
pub struct NetworkState {
73122
pub(crate) peer_registry: RwLock<PeerRegistry>,
@@ -77,7 +126,7 @@ pub struct NetworkState {
77126
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
78127
/// Node public addresses,
79128
/// includes manually public addrs and remote peer observed addrs
80-
public_addrs: RwLock<HashSet<Multiaddr>>,
129+
public_addrs: PublicAddresses,
81130
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
82131
local_private_key: secio::SecioKeyPair,
83132
local_peer_id: PeerId,
@@ -100,7 +149,7 @@ impl NetworkState {
100149
let local_private_key = config.fetch_private_key()?;
101150
let local_peer_id = local_private_key.peer_id();
102151
// set max score to public addresses
103-
let public_addrs: HashSet<Multiaddr> = config
152+
let configured_public_addrs: HashSet<Multiaddr> = config
104153
.listen_addresses
105154
.iter()
106155
.chain(config.public_addresses.iter())
@@ -115,6 +164,9 @@ impl NetworkState {
115164
}
116165
})
117166
.collect();
167+
168+
let discovered_public_addrs = HashSet::new();
169+
let public_addrs = PublicAddresses::new(configured_public_addrs, discovered_public_addrs);
118170
info!("Loading the peer store. This process may take a few seconds to complete.");
119171

120172
let peer_store = Mutex::new(PeerStore::load_from_dir_or_default(
@@ -135,7 +187,7 @@ impl NetworkState {
135187
bootnodes,
136188
peer_registry: RwLock::new(peer_registry),
137189
dialing_addrs: RwLock::new(HashMap::default()),
138-
public_addrs: RwLock::new(public_addrs),
190+
public_addrs,
139191
listened_addrs: RwLock::new(Vec::new()),
140192
pending_observed_addrs: RwLock::new(HashSet::default()),
141193
local_private_key,
@@ -335,7 +387,7 @@ impl NetworkState {
335387

336388
pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
337389
self.public_addrs
338-
.read()
390+
.all()
339391
.iter()
340392
.take(count)
341393
.cloned()
@@ -388,7 +440,7 @@ impl NetworkState {
388440
trace!("Do not dial self: {:?}, {}", peer_id, addr);
389441
return false;
390442
}
391-
if self.public_addrs.read().contains(addr) {
443+
if self.public_addrs.contains(addr) {
392444
trace!(
393445
"Do not dial listened address(self): {:?}, {}",
394446
peer_id,
@@ -502,12 +554,12 @@ impl NetworkState {
502554
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
503555
let mut pending_observed_addrs = self.pending_observed_addrs.write();
504556
if pending_observed_addrs.is_empty() {
505-
let addrs = self.public_addrs.read();
506-
if addrs.is_empty() {
557+
let addrs = &self.public_addrs;
558+
if addrs.count() == 0 {
507559
return;
508560
}
509561
// random get addr
510-
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
562+
if let Some(addr) = addrs.random_choose() {
511563
if let Err(err) = p2p_control.dial(
512564
addr.clone(),
513565
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
@@ -611,7 +663,8 @@ impl ServiceHandle for EventHandler {
611663
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
612664
match error {
613665
ServiceError::DialerError { address, error } => {
614-
let mut public_addrs = self.network_state.public_addrs.write();
666+
let mut discovered_public_addrs =
667+
self.network_state.public_addrs.discovered.write();
615668

616669
match error {
617670
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
@@ -620,7 +673,7 @@ impl ServiceHandle for EventHandler {
620673
debug!("dial observed address success: {:?}", address);
621674
if let Some(ip) = multiaddr_to_socketaddr(&address) {
622675
if is_reachable(ip.ip()) {
623-
public_addrs.insert(address);
676+
discovered_public_addrs.insert(address);
624677
}
625678
}
626679
return;
@@ -634,9 +687,9 @@ impl ServiceHandle for EventHandler {
634687
debug!("DialerError({}) {}", address, error);
635688
}
636689
}
637-
if public_addrs.remove(&address) {
690+
if discovered_public_addrs.remove(&address) {
638691
info!(
639-
"Dial {} failed, remove it from network_state.public_addrs",
692+
"Dial {} failed, remove it from network_state.public_addrs.discovered",
640693
address
641694
);
642695
}

0 commit comments

Comments
 (0)