Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Error handling improvements #35

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 96 additions & 35 deletions radicle-node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,17 @@ pub enum Event {
},
}

/// General service error.
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error(transparent)]
Storage(#[from] storage::Error),
#[error(transparent)]
Fetch(#[from] storage::FetchError),
#[error(transparent)]
Routing(#[from] routing::Error),
}

/// Error returned by [`Command::Fetch`].
#[derive(thiserror::Error, Debug)]
pub enum FetchError {
Expand Down Expand Up @@ -314,11 +325,12 @@ where
&mut self.reactor
}

pub fn lookup(&self, id: Id) -> Result<Lookup, routing::Error> {
/// Lookup a project, both locally and in the routing table.
pub fn lookup(&self, id: Id) -> Result<Lookup, LookupError> {
let remote = self.routing.get(&id)?.iter().cloned().collect();

Ok(Lookup {
local: self.storage.get(&self.node_id(), id).unwrap(),
local: self.storage.get(&self.node_id(), id)?,
remote,
})
}
Expand Down Expand Up @@ -362,15 +374,19 @@ where
}
if now - self.last_announce >= ANNOUNCE_INTERVAL {
if self.out_of_sync {
self.announce_inventory().unwrap();
if let Err(err) = self.announce_inventory() {
error!("Error announcing inventory: {}", err);
}
}
self.reactor.wakeup(ANNOUNCE_INTERVAL);
self.last_announce = now;
}
if now - self.last_prune >= PRUNE_INTERVAL {
debug!("Running 'prune' task...");

self.prune_routing_entries();
if let Err(err) = self.prune_routing_entries() {
error!("Error pruning routing entries: {}", err);
}
self.reactor.wakeup(PRUNE_INTERVAL);
self.last_prune = now;
}
Expand Down Expand Up @@ -451,20 +467,9 @@ where
resp.send(self.untrack(id)).ok();
}
Command::AnnounceRefs(id) => {
let node = self.node_id();
let repo = self.storage.repository(id).unwrap();
let remote = repo.remote(&node).unwrap();
let peers = self.sessions.negotiated().map(|(_, p)| p);
let refs = remote.refs.into();
let timestamp = self.clock.timestamp();
let msg = AnnouncementMessage::from(RefsAnnouncement {
id,
refs,
timestamp,
});
let ann = msg.signed(&self.signer);

self.reactor.broadcast(ann, peers);
if let Err(err) = self.announce_refs(id) {
error!("Error announcing refs: {}", err);
}
}
Command::QueryState(query, sender) => {
sender.send(query(self)).ok();
Expand Down Expand Up @@ -524,7 +529,7 @@ where
pub fn disconnected(
&mut self,
addr: &std::net::SocketAddr,
reason: nakamoto::DisconnectReason<DisconnectReason>,
reason: &nakamoto::DisconnectReason<DisconnectReason>,
) {
let since = self.local_time();
let address = Address::from(*addr);
Expand Down Expand Up @@ -593,33 +598,47 @@ where
/// and `false` if it should not.
pub fn handle_announcement(
&mut self,
session: &NodeId,
git: &git::Url,
announcement: &Announcement,
) -> Result<bool, peer::SessionError> {
if !announcement.verify() {
return Err(SessionError::Misbehavior);
}
let Announcement { node, message, .. } = announcement;
let now = self.clock.local_time();

// Don't allow messages from too far in the future.
if message.timestamp().saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
return Err(SessionError::InvalidTimestamp(message.timestamp()));
}

match message {
AnnouncementMessage::Inventory(message) => {
let now = self.clock.local_time();
let peer = self.peers.entry(*node).or_insert_with(Peer::default);
let relay = self.config.relay;

// Don't allow messages from too far in the future.
if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
return Err(SessionError::InvalidTimestamp(message.timestamp));
}
// Discard inventory messages we've already seen, otherwise update
// out last seen time.
if message.timestamp > peer.last_message {
peer.last_message = message.timestamp;
} else {
return Ok(false);
}
self.process_inventory(&message.inventory, *node, git)
.unwrap();

if let Err(err) = self.process_inventory(&message.inventory, *node, git) {
error!("Error processing inventory from {}: {}", node, err);

if let Error::Fetch(storage::FetchError::Verify(err)) = err {
// Disconnect the peer if it is the signer of this message.
if node == session {
return Err(peer::SessionError::VerificationFailed(err));
}
}
// There's not much we can do if the peer sending us this message isn't the
// origin of it.
return Ok(false);
}

if relay {
return Ok(true);
Expand All @@ -632,6 +651,8 @@ where
// TODO: Check that we're tracking this user as well.
if self.config.is_tracking(&message.id) {
// TODO: Check refs to see if we should try to fetch or not.
// FIXME: This code is wrong: we shouldn't be fetching from the connected peer,
// we should fetch from the origin.
let updated = self.storage.fetch(message.id, git).unwrap();
let is_updated = !updated.is_empty();

Expand Down Expand Up @@ -715,11 +736,12 @@ where
return Err(SessionError::Misbehavior);
}
// Process a peer announcement.
(SessionState::Negotiated { git, .. }, Message::Announcement(ann)) => {
(SessionState::Negotiated { id, git, .. }, Message::Announcement(ann)) => {
let git = git.clone();
let id = *id;

// Returning true here means that the message should be relayed.
if self.handle_announcement(&git, &ann)? {
if self.handle_announcement(&id, &git, &ann)? {
self.gossip.received(ann.clone(), ann.message.timestamp());
return Ok(Some(ann));
}
Expand Down Expand Up @@ -753,20 +775,40 @@ where
inventory: &Inventory,
from: NodeId,
remote: &Url,
) -> Result<(), routing::Error> {
) -> Result<(), Error> {
for proj_id in inventory {
// TODO: Fire an event on routing update.
if self
.routing
.insert(*proj_id, from, self.clock.timestamp())?
&& self.config.is_tracking(proj_id)
{
self.storage.fetch(*proj_id, remote).unwrap();
self.storage.fetch(*proj_id, remote)?;
}
}
Ok(())
}

/// Announce local refs for given id.
fn announce_refs(&mut self, id: Id) -> Result<(), storage::Error> {
let node = self.node_id();
let repo = self.storage.repository(id)?;
let remote = repo.remote(&node)?;
let peers = self.sessions.negotiated().map(|(_, p)| p);
let refs = remote.refs.into();
let timestamp = self.clock.timestamp();
let msg = AnnouncementMessage::from(RefsAnnouncement {
id,
refs,
timestamp,
});
let ann = msg.signed(&self.signer);

self.reactor.broadcast(ann, peers);

Ok(())
}

////////////////////////////////////////////////////////////////////////////
// Periodic tasks
////////////////////////////////////////////////////////////////////////////
Expand All @@ -785,8 +827,9 @@ where
Ok(())
}

fn prune_routing_entries(&mut self) {
fn prune_routing_entries(&mut self) -> Result<(), storage::Error> {
// TODO
Ok(())
}

fn maintain_connections(&mut self) {
Expand All @@ -808,7 +851,7 @@ pub trait ServiceState {
/// Get the current inventory.
fn inventory(&self) -> Result<Inventory, storage::Error>;
/// Get a project from storage, using the local node's key.
fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::Error>;
fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::ProjectError>;
/// Get the clock.
fn clock(&self) -> &RefClock;
/// Get service configuration.
Expand All @@ -831,7 +874,7 @@ where
self.storage.inventory()
}

fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::Error> {
fn get(&self, proj: Id) -> Result<Option<Doc<Verified>>, storage::ProjectError> {
self.storage.get(&self.node_id(), proj)
}

Expand All @@ -848,7 +891,7 @@ where
}
}

#[derive(Debug, Clone)]
#[derive(Debug)]
pub enum DisconnectReason {
User,
Error(SessionError),
Expand Down Expand Up @@ -895,6 +938,16 @@ pub struct Lookup {
pub remote: Vec<NodeId>,
}

#[derive(thiserror::Error, Debug)]
pub enum LookupError {
#[error(transparent)]
Storage(#[from] storage::Error),
#[error(transparent)]
Routing(#[from] routing::Error),
#[error(transparent)]
Project(#[from] storage::ProjectError),
}

/// Information on a peer, that we may or may not be connected to.
#[derive(Default, Debug)]
pub struct Peer {
Expand Down Expand Up @@ -979,7 +1032,15 @@ mod gossip {
config: &Config,
) -> [Message; 4] {
let git = config.git_url.clone();
let inventory = storage.inventory().unwrap();
let inventory = match storage.inventory() {
Ok(i) => i,
Err(e) => {
error!("Error getting local inventory for handshake: {}", e);
// Other than crashing the node completely, there's nothing we can do
// here besides returning an empty inventory and logging an error.
vec![]
}
};

[
Message::init(*signer.public_key(), config.listen.clone(), git),
Expand Down
4 changes: 3 additions & 1 deletion radicle-node/src/service/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub enum SessionState {
Disconnected { since: LocalTime },
}

#[derive(thiserror::Error, Debug, Clone)]
#[derive(thiserror::Error, Debug)]
pub enum SessionError {
#[error("wrong network constant in message: {0}")]
WrongMagic(u32),
Expand All @@ -33,6 +33,8 @@ pub enum SessionError {
InvalidTimestamp(u64),
#[error("session not found for address `{0}`")]
NotFound(net::IpAddr),
#[error("verification failed on fetch: {0}")]
VerificationFailed(#[from] storage::VerifyError),
#[error("peer misbehaved")]
Misbehavior,
}
Expand Down
15 changes: 8 additions & 7 deletions radicle-node/src/test/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod arbitrary;
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut, Range};
use std::rc::Rc;
use std::{fmt, io, net};

use log::*;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub enum Input {
/// Disconnected from peer.
Disconnected(
net::SocketAddr,
nakamoto::DisconnectReason<DisconnectReason>,
Rc<nakamoto::DisconnectReason<DisconnectReason>>,
),
/// Received a message from a remote peer.
Received(net::SocketAddr, Vec<Envelope>),
Expand Down Expand Up @@ -393,7 +394,7 @@ impl<S: WriteStorage + 'static> Simulation<S> {
assert!(!(attempt && connection));

if attempt || connection {
p.disconnected(&addr, reason);
p.disconnected(&addr, &reason);
}
}
Input::Wake => p.wake(),
Expand Down Expand Up @@ -501,9 +502,9 @@ impl<S: WriteStorage + 'static> Simulation<S> {
remote,
input: Input::Disconnected(
remote,
nakamoto::DisconnectReason::ConnectionError(
Rc::new(nakamoto::DisconnectReason::ConnectionError(
io::Error::from(io::ErrorKind::UnexpectedEof).into(),
),
)),
),
},
);
Expand Down Expand Up @@ -543,7 +544,7 @@ impl<S: WriteStorage + 'static> Simulation<S> {
self.priority.push_back(Scheduled {
remote,
node,
input: Input::Disconnected(remote, reason.into()),
input: Input::Disconnected(remote, Rc::new(reason.into())),
});

// Nb. It's possible for disconnects to happen simultaneously from both ends, hence
Expand All @@ -569,9 +570,9 @@ impl<S: WriteStorage + 'static> Simulation<S> {
remote: local_addr,
input: Input::Disconnected(
local_addr,
nakamoto::DisconnectReason::ConnectionError(
Rc::new(nakamoto::DisconnectReason::ConnectionError(
io::Error::from(io::ErrorKind::ConnectionReset).into(),
),
)),
),
},
);
Expand Down
6 changes: 3 additions & 3 deletions radicle-node/src/test/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,14 +420,14 @@ fn test_persistent_peer_reconnect() {
// a reconnection.
alice.disconnected(
&eve.addr(),
nakamoto::DisconnectReason::DialError(error.clone()),
&nakamoto::DisconnectReason::DialError(error.clone()),
);
assert_matches!(alice.outbox().next(), None);

for _ in 0..MAX_CONNECTION_ATTEMPTS {
alice.disconnected(
&bob.addr(),
nakamoto::DisconnectReason::ConnectionError(error.clone()),
&nakamoto::DisconnectReason::ConnectionError(error.clone()),
);
assert_matches!(alice.outbox().next(), Some(Io::Connect(a)) if a == bob.addr());
assert_matches!(alice.outbox().next(), None);
Expand All @@ -438,7 +438,7 @@ fn test_persistent_peer_reconnect() {
// After the max connection attempts, a disconnect doesn't trigger a reconnect.
alice.disconnected(
&bob.addr(),
nakamoto::DisconnectReason::ConnectionError(error),
&nakamoto::DisconnectReason::ConnectionError(error),
);
assert_matches!(alice.outbox().next(), None);
}
Expand Down
2 changes: 1 addition & 1 deletion radicle-node/src/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ where
reason: nakamoto::DisconnectReason<service::DisconnectReason>,
) {
self.inboxes.remove(&addr.ip());
self.inner.disconnected(addr, reason)
self.inner.disconnected(addr, &reason)
}

pub fn received_bytes(&mut self, addr: &std::net::SocketAddr, bytes: &[u8]) {
Expand Down
Loading