Skip to content

Wallet stuff #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub use nakamoto_p2p::fsm::{Command, CommandError, Hooks, Limits, Link, Peer};
pub use crate::error::Error;
pub use crate::event::{Event, Loading};
pub use crate::handle;
pub use crate::handle::Events;
pub use crate::service::Service;

use crate::event::Mapper;
Expand Down Expand Up @@ -189,14 +190,15 @@ impl<R: Reactor> ClientRunner<R> {
/// A light-client process.
pub struct Client<R: Reactor> {
handle: Handle<R::Waker>,
config: Config,
commands: chan::Receiver<Command>,
publisher: Publisher<fsm::Event>,
reactor: R,
}

impl<R: Reactor> Client<R> {
/// Create a new client.
pub fn new() -> Result<Self, Error> {
pub fn new(config: Config) -> Result<Self, Error> {
let (commands_tx, commands_rx) = chan::unbounded::<Command>();
let (event_pub, events) = event::broadcast(|e, p| p.emit(e));
let (blocks_pub, blocks) = event::broadcast(|e, p| {
Expand All @@ -221,7 +223,7 @@ impl<R: Reactor> Client<R> {
}
});
let (publisher, subscriber) = event::broadcast({
let mut mapper = Mapper::default();
let mut mapper = Mapper::new(config.network);
move |e, p| mapper.process(e, p)
});

Expand All @@ -247,6 +249,7 @@ impl<R: Reactor> Client<R> {
};

Ok(Self {
config,
handle,
commands: commands_rx,
publisher,
Expand All @@ -256,11 +259,8 @@ impl<R: Reactor> Client<R> {

/// Load the client configuration. Takes a loading handler that can optionally receive
/// loading events.
pub fn load(
self,
config: Config,
loading: impl Into<LoadingHandler>,
) -> Result<ClientRunner<R>, Error> {
pub fn load(self, loading: impl Into<LoadingHandler>) -> Result<ClientRunner<R>, Error> {
let config = self.config;
let loading = loading.into();
let home = config.root.join(".nakamoto");
let network = config.network;
Expand Down Expand Up @@ -397,8 +397,8 @@ impl<R: Reactor> Client<R> {
}

/// Start the client process. This function is meant to be run in its own thread.
pub fn run(self, config: Config) -> Result<(), Error> {
self.load(config, LoadingHandler::Ignore)?.run()
pub fn run(self) -> Result<(), Error> {
self.load(LoadingHandler::Ignore)?.run()
}

/// Start the client process, supplying the service manually.
Expand Down Expand Up @@ -552,8 +552,8 @@ impl<W: Waker> handle::Handle for Handle<W> {
self.filters.subscribe()
}

fn events(&self) -> chan::Receiver<Event> {
self.subscriber.subscribe()
fn events(&self) -> handle::Events {
handle::Events::from(self.subscriber.subscribe())
}

fn command(&self, cmd: Command) -> Result<(), handle::Error> {
Expand Down
51 changes: 29 additions & 22 deletions client/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{fmt, io, net};
use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::{Transaction, Txid};
use nakamoto_common::block::{Block, BlockHash, BlockHeader, Height};
use nakamoto_common::network::Network;
use nakamoto_net::event::Emitter;
use nakamoto_net::Disconnect;
use nakamoto_p2p::fsm;
Expand Down Expand Up @@ -60,6 +61,8 @@ pub enum Event {
Ready {
/// The tip of the block header chain.
tip: Height,
/// The hash of the tip.
hash: BlockHash,
/// The tip of the filter header chain.
filter_tip: Height,
},
Expand Down Expand Up @@ -178,6 +181,8 @@ pub enum Event {
Synced {
/// Height up to which we are synced.
height: Height,
/// Block hash up to which we are synced.
hash: BlockHash,
/// Tip of our block header chain.
tip: Height,
},
Expand Down Expand Up @@ -321,28 +326,28 @@ pub(crate) struct Mapper {
sync_height: Height,
/// The height up to which we've processed filters.
/// This is usually going to be greater than `sync_height`.
filter_height: Height,
filter_tip: (Height, BlockHash),
/// The height up to which we've processed matching blocks.
/// This is always going to be lesser or equal to `filter_height`.
block_height: Height,
block_tip: (Height, BlockHash),
/// Filter heights that have been matched, and for which we are awaiting a block to process.
pending: HashSet<Height>,
}

impl Default for Mapper {
impl Mapper {
/// Create a new client event mapper.
fn default() -> Self {
pub fn new(network: Network) -> Self {
let tip = 0;
let sync_height = 0;
let filter_height = 0;
let block_height = 0;
let filter_tip = (0, network.genesis_hash());
let block_tip = (0, network.genesis_hash());
let pending = HashSet::new();

Self {
tip,
sync_height,
filter_height,
block_height,
filter_tip,
block_tip,
pending,
}
}
Expand All @@ -354,11 +359,13 @@ impl Mapper {
match event {
fsm::Event::Ready {
height,
hash,
filter_height,
..
} => {
emitter.emit(Event::Ready {
tip: height,
hash,
filter_tip: filter_height,
});
}
Expand Down Expand Up @@ -439,12 +446,8 @@ impl Mapper {
status: TxStatus::Acknowledged { peer },
});
}
fsm::Event::Filter(fsm::FilterEvent::RescanStarted { start, .. }) => {
fsm::Event::Filter(fsm::FilterEvent::RescanStarted { .. }) => {
self.pending.clear();

self.filter_height = start;
self.sync_height = start;
self.block_height = start;
}
fsm::Event::Filter(fsm::FilterEvent::FilterProcessed {
block,
Expand All @@ -458,28 +461,31 @@ impl Mapper {
_ => {}
}
assert!(
self.block_height <= self.filter_height,
self.block_tip <= self.filter_tip,
"Filters are processed before blocks"
);
assert!(
self.sync_height <= self.filter_height,
self.sync_height <= self.filter_tip.0,
"Filters are processed before we are done"
);

// If we have no blocks left to process, we are synced to the height of the last
// processed filter. Otherwise, we're synced up to the last processed block.
let height = if self.pending.is_empty() {
self.filter_height
let (height, hash) = if self.pending.is_empty() {
self.filter_tip.max(self.block_tip)
} else {
self.block_height
self.block_tip
};

// Ensure we only broadcast sync events when the sync height has changed.
if height > self.sync_height {
debug_assert!(height == self.sync_height + 1);

self.sync_height = height;

emitter.emit(Event::Synced {
height,
hash,
tip: self.tip,
});
}
Expand All @@ -502,9 +508,9 @@ impl Mapper {
}

log::debug!("Received block {} at height {}", hash, height);
debug_assert!(height >= self.block_height);
debug_assert!(height >= self.block_tip.0);

self.block_height = height;
self.block_tip = (height, block.block_hash());

emitter.emit(Event::BlockMatched {
height,
Expand All @@ -524,13 +530,13 @@ impl Mapper {
valid: bool,
emitter: &Emitter<Event>,
) {
debug_assert!(height >= self.filter_height);
debug_assert!(height >= self.filter_tip.0);

if matched {
log::debug!("Filter matched for block #{}", height);
self.pending.insert(height);
}
self.filter_height = height;
self.filter_tip = (height, block);

emitter.emit(Event::FilterProcessed {
height,
Expand Down Expand Up @@ -843,6 +849,7 @@ mod test {
Event::Synced {
height: sync_height,
tip,
..
} => {
assert_eq!(height, tip);

Expand Down
43 changes: 39 additions & 4 deletions client/src/handle.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
//! Node handles are created from nodes by users of the library, to communicate with the underlying
//! protocol instance.
use std::net;
use std::ops::{RangeBounds, RangeInclusive};
use std::ops::{Deref, RangeBounds, RangeInclusive};

use crossbeam_channel as chan;
use thiserror::Error;

use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::network::message::NetworkMessage;
use nakamoto_common::bitcoin::network::Address;
use nakamoto_common::bitcoin::util::uint::Uint256;
use nakamoto_common::bitcoin::Script;

use nakamoto_common::bitcoin::network::message::NetworkMessage;
use nakamoto_common::block::filter::BlockFilter;
use nakamoto_common::block::tree::{BlockReader, ImportResult};
use nakamoto_common::block::{self, Block, BlockHash, BlockHeader, Height, Transaction};
use nakamoto_common::nonempty::NonEmpty;
use nakamoto_net::event;
use nakamoto_p2p::fsm::Link;
use nakamoto_p2p::fsm::{self, Command, CommandError, GetFiltersError, Peer};

Expand Down Expand Up @@ -62,6 +62,41 @@ impl<T> From<chan::SendError<T>> for Error {
}
}

/// [`Event`] receiver.
#[derive(Debug, Clone)]
pub struct Events {
receiver: chan::Receiver<Event>,
}

impl Events {
/// Wait for the readiness event.
pub fn wait_for_ready(&self) -> Result<(Height, BlockHash), Error> {
event::wait(
&self.receiver,
|event| match event {
Event::Ready { tip, hash, .. } => Some((tip, hash)),
_ => None,
},
std::time::Duration::MAX,
)
.map_err(Error::from)
}
}

impl From<chan::Receiver<Event>> for Events {
fn from(receiver: chan::Receiver<Event>) -> Self {
Self { receiver }
}
}

impl Deref for Events {
type Target = chan::Receiver<Event>;

fn deref(&self) -> &Self::Target {
&self.receiver
}
}

/// A handle for communicating with a node process.
pub trait Handle: Sized + Send + Sync + Clone {
/// Get the tip of the active chain. Returns the height of the chain, the header,
Expand Down Expand Up @@ -95,7 +130,7 @@ pub trait Handle: Sized + Send + Sync + Clone {
/// Subscribe to compact filters received.
fn filters(&self) -> chan::Receiver<(BlockFilter, BlockHash, Height)>;
/// Subscribe to client events.
fn events(&self) -> chan::Receiver<Event>;
fn events(&self) -> Events;

/// Send a command to the client.
fn command(&self, cmd: Command) -> Result<(), Error>;
Expand Down
4 changes: 1 addition & 3 deletions net/poll/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ impl<Id: PeerId> Reactor<net::TcpStream, Id> {

match self::dial(&socket_addr) {
Ok(stream) => {
trace!("{:#?}", stream);
trace!("Stream established with {}", socket_addr);

self.register_peer(addr.clone(), stream, Link::Outbound);
self.connecting.insert(addr.clone());
Expand Down Expand Up @@ -340,8 +340,6 @@ impl<Id: PeerId> Reactor<net::TcpStream, Id> {
self.timeouts.register((), local_time + timeout);
}
Io::Event(event) => {
trace!("Event: {:?}", event);

publisher.publish(event);
}
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ pub fn run(
cfg.limits.max_outbound_peers = connect.len();
}

Client::<Reactor>::new()?.run(cfg)
Client::<Reactor>::new(cfg)?.run()
}
1 change: 1 addition & 0 deletions p2p/src/fsm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits:
self.cbfmgr.initialize(&self.tree);
self.outbox.event(Event::Ready {
height: self.tree.height(),
hash: self.tree.tip().0,
filter_height: self.cbfmgr.filters.height(),
time,
});
Expand Down
11 changes: 7 additions & 4 deletions p2p/src/fsm/cbfmgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl std::fmt::Display for Event {
} => {
write!(
fmt,
"Filter processed at height {} (match = {}, valid = {})",
"Filter {} processed (match = {}, valid = {})",
height, matched, valid
)
}
Expand Down Expand Up @@ -415,7 +415,8 @@ impl<F: Filters, U: Wire<Event> + SetTimer + Disconnect, C: Clock> FilterManager
}

log::debug!(
"[spv] Rollback from {} to {}, start = {}, height = {}",
target: "spv",
"Rollback from {} to {}, start = {}, height = {}",
current,
self.rescan.current,
start,
Expand Down Expand Up @@ -536,7 +537,8 @@ impl<F: Filters, U: Wire<Event> + SetTimer + Disconnect, C: Clock> FilterManager
let timeout = self.config.request_timeout;

log::debug!(
"Requested filter(s) in range {} to {} from {} (stop = {})",
target: "spv",
"Requesting filter(s) in range {} to {} from {} (stop = {})",
range.start(),
range.end(),
peer,
Expand All @@ -563,7 +565,8 @@ impl<F: Filters, U: Wire<Event> + SetTimer + Disconnect, C: Clock> FilterManager
let stop_hash = msg.stop_hash;

log::debug!(
"[spv] Received {} filter header(s) from {}",
target: "spv",
"Received {} filter header(s) from {}",
msg.filter_hashes.len(),
from
);
Expand Down
Loading