Skip to content

Commit

Permalink
fix: tweak e2e test
Browse files Browse the repository at this point in the history
  • Loading branch information
quake committed Jun 11, 2024
1 parent 2bab107 commit 1bcfc96
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 47 deletions.
8 changes: 7 additions & 1 deletion src/ckb/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,11 @@ enum CommitmentSignedFlags {
}

#[derive(Copy, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(
rename_all = "SCREAMING_SNAKE_CASE",
tag = "state_name",
content = "state_flags"
)]
pub enum ChannelState {
/// We are negotiating the parameters required for the channel prior to funding it.
NegotiatingFunding(NegotiatingFundingFlags),
Expand Down Expand Up @@ -2825,7 +2830,8 @@ pub trait ChannelActorStateStore {
fn get_channel_actor_state(&self, id: &Hash256) -> Option<ChannelActorState>;
fn insert_channel_actor_state(&self, state: ChannelActorState);
fn delete_channel_actor_state(&self, id: &Hash256);
fn get_channels(&self, peer_id: &PeerId) -> Vec<Hash256>;
fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256>;
fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)>;
}

/// A wrapper on CommitmentTransaction that has a partial signature along with
Expand Down
2 changes: 1 addition & 1 deletion src/ckb/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ impl NetworkActorState {
debug!("Peer connected: {:?}, session id: {}", &peer_id, session.id);
self.peer_session_map.insert(peer_id.clone(), session.id);

for channel_id in store.get_channels(&peer_id) {
for channel_id in store.get_channel_ids_by_peer(&peer_id) {
debug!("Reestablishing channel {:?}", &channel_id);
if let Ok((channel, _)) = Actor::spawn_linked(
None,
Expand Down
23 changes: 21 additions & 2 deletions src/ckb/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
};

use super::{
channel::{ChannelActorState, ChannelActorStateStore},
channel::{ChannelActorState, ChannelActorStateStore, ChannelState},
types::Hash256,
NetworkActor, NetworkActorCommand, NetworkActorMessage,
};
Expand Down Expand Up @@ -246,7 +246,7 @@ impl ChannelActorStateStore for MemoryStore {
self.channel_actor_state_map.write().unwrap().remove(id);
}

fn get_channels(&self, peer_id: &PeerId) -> Vec<Hash256> {
fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256> {
self.channel_actor_state_map
.read()
.unwrap()
Expand All @@ -260,6 +260,25 @@ impl ChannelActorStateStore for MemoryStore {
})
.collect()
}

fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)> {
let map = self.channel_actor_state_map.read().unwrap();
let values = map.values();
match peer_id {
Some(peer_id) => values
.filter_map(|state| {
if peer_id == state.peer_id {
Some((state.peer_id.clone(), state.id.clone(), state.state.clone()))
} else {
None
}
})
.collect(),
None => values
.map(|state| (state.peer_id.clone(), state.id.clone(), state.state.clone()))
.collect(),
}
}
}

#[cfg(test)]
Expand Down
61 changes: 55 additions & 6 deletions src/rpc/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::ckb::{
channel::{
AddTlcCommand, ChannelCommand, ChannelCommandWithId, RemoveTlcCommand, ShutdownCommand,
AddTlcCommand, ChannelActorStateStore, ChannelCommand, ChannelCommandWithId, ChannelState,
RemoveTlcCommand, ShutdownCommand,
},
network::{AcceptChannelCommand, OpenChannelCommand},
serde_utils::{U128Hex, U32Hex, U64Hex},
Expand Down Expand Up @@ -51,6 +52,27 @@ pub struct CommitmentSignedParams {
pub channel_id: Hash256,
}

#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct ListChannelsParams {
#[serde_as(as = "Option<DisplayFromStr>")]
pub peer_id: Option<PeerId>,
}

#[derive(Clone, Serialize)]
pub struct ListChannelsResult {
pub channels: Vec<Channel>,
}

#[serde_as]
#[derive(Clone, Serialize)]
pub struct Channel {
pub channel_id: Hash256,
#[serde_as(as = "DisplayFromStr")]
pub peer_id: PeerId,
pub state: ChannelState,
}

#[serde_as]
#[derive(Serialize, Deserialize)]
pub struct AddTlcParams {
Expand Down Expand Up @@ -113,6 +135,12 @@ pub trait ChannelRpc {
params: AcceptChannelParams,
) -> Result<AcceptChannelResult, ErrorObjectOwned>;

#[method(name = "list_channels")]
async fn list_channels(
&self,
params: ListChannelsParams,
) -> Result<ListChannelsResult, ErrorObjectOwned>;

#[method(name = "commitment_signed")]
async fn commitment_signed(
&self,
Expand All @@ -130,18 +158,22 @@ pub trait ChannelRpc {
-> Result<(), ErrorObjectOwned>;
}

pub struct ChannelRpcServerImpl {
pub struct ChannelRpcServerImpl<S> {
actor: ActorRef<NetworkActorMessage>,
store: S,
}

impl ChannelRpcServerImpl {
pub fn new(actor: ActorRef<NetworkActorMessage>) -> Self {
ChannelRpcServerImpl { actor }
impl<S> ChannelRpcServerImpl<S> {
pub fn new(actor: ActorRef<NetworkActorMessage>, store: S) -> Self {
ChannelRpcServerImpl { actor, store }
}
}

#[async_trait]
impl ChannelRpcServer for ChannelRpcServerImpl {
impl<S> ChannelRpcServer for ChannelRpcServerImpl<S>
where
S: ChannelActorStateStore + Send + Sync + 'static,
{
async fn open_channel(
&self,
params: OpenChannelParams,
Expand Down Expand Up @@ -192,6 +224,23 @@ impl ChannelRpcServer for ChannelRpcServerImpl {
}
}

async fn list_channels(
&self,
params: ListChannelsParams,
) -> Result<ListChannelsResult, ErrorObjectOwned> {
let channels = self.store.get_channel_states(params.peer_id);
Ok(ListChannelsResult {
channels: channels
.into_iter()
.map(|(peer_id, channel_id, state)| Channel {
channel_id,
peer_id,
state,
})
.collect(),
})
}

async fn commitment_signed(
&self,
params: CommitmentSignedParams,
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod peer;

use crate::{
cch::CchCommand,
ckb::NetworkActorMessage,
ckb::{channel::ChannelActorStateStore, NetworkActorMessage},
invoice::{InvoiceCommand, InvoiceStore},
};
use cch::{CchRpcServer, CchRpcServerImpl};
Expand All @@ -20,18 +20,18 @@ use tokio::sync::mpsc::Sender;

pub type InvoiceCommandWithReply = (InvoiceCommand, Sender<crate::Result<String>>);

pub async fn start_rpc<S: InvoiceStore + Send + Sync + 'static>(
pub async fn start_rpc<S: ChannelActorStateStore + InvoiceStore + Clone + Send + Sync + 'static>(
config: RpcConfig,
ckb_network_actor: Option<ActorRef<NetworkActorMessage>>,
cch_command_sender: Option<Sender<CchCommand>>,
store: S,
) -> ServerHandle {
let listening_addr = config.listening_addr.as_deref().unwrap_or("[::]:0");
let server = Server::builder().build(listening_addr).await.unwrap();
let mut methods = InvoiceRpcServerImpl::new(store).into_rpc();
let mut methods = InvoiceRpcServerImpl::new(store.clone()).into_rpc();
if let Some(ckb_network_actor) = ckb_network_actor {
let peer = PeerRpcServerImpl::new(ckb_network_actor.clone());
let channel = ChannelRpcServerImpl::new(ckb_network_actor.clone());
let channel = ChannelRpcServerImpl::new(ckb_network_actor.clone(), store);
methods.merge(peer.into_rpc()).unwrap();
methods.merge(channel.into_rpc()).unwrap();
}
Expand Down
22 changes: 21 additions & 1 deletion src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl ChannelActorStateStore for Store {
}
}

fn get_channels(&self, peer_id: &tentacle::secio::PeerId) -> Vec<Hash256> {
fn get_channel_ids_by_peer(&self, peer_id: &tentacle::secio::PeerId) -> Vec<Hash256> {
let prefix = [&[64], peer_id.as_bytes()].concat();
let iter = self.db.prefix_iterator(prefix.as_ref());
iter.map(|(key, _)| {
Expand All @@ -141,6 +141,26 @@ impl ChannelActorStateStore for Store {
})
.collect()
}

fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)> {
let prefix = match peer_id {
Some(peer_id) => [&[64], peer_id.as_bytes()].concat(),
None => vec![64],
};
let iter = self.db.prefix_iterator(prefix.as_ref());
iter.map(|(key, value)| {
let key_len = key.len();
let peer_id = PeerId::from_bytes(key[1..key_len - 32].into())
.expect("deserialize peer id should be OK");
let channel_id: [u8; 32] = key[key_len - 32..]
.try_into()
.expect("channel id should be 32 bytes");
let state = serde_json::from_slice(value.as_ref())
.expect("deserialize ChannelState should be OK");
(peer_id, channel_id.into(), state)
})
.collect()
}
}

impl InvoiceStore for Store {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ meta {
}

post {
url: {{NODE2_RPC_URL}}
url: {{NODE1_RPC_URL}}
body: json
auth: none
}
Expand All @@ -22,7 +22,7 @@ body:json {
"method": "open_channel",
"params": [
{
"peer_id": "{{NODE1_PEERID}}",
"peer_id": "{{NODE2_PEERID}}",
"funding_amount": "0x377aab54d000"
}
]
Expand All @@ -35,6 +35,5 @@ assert {
}

script:post-response {
await new Promise(r => setTimeout(r, 1000));
bru.setVar("N1N2_TEMP_CHANNEL_ID", res.body.result.temporary_channel_id);
await new Promise(r => setTimeout(r, 2000));
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
meta {
name: Node2 accept a channel from a peer
name: get auto accepted channel id from Node2
type: http
seq: 8
seq: 4
}

post {
Expand All @@ -19,23 +19,23 @@ body:json {
{
"id": "42",
"jsonrpc": "2.0",
"method": "accept_channel",
"method": "list_channels",
"params": [
{
"temporary_channel_id": "{{N2N3_TEMP_CHANNEL_ID}}",
"funding_amount": "0x377aab54d000"
"peer_id": "{{NODE1_PEERID}}"
}
]
}
}

assert {
res.body.error: isUndefined
res.body.result.channel_id: isDefined
res.body.result.channels: isDefined
}

script:post-response {
console.log(res.body.result);
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 2000));
bru.setVar("N2N3_CHANNEL_ID", res.body.result.channel_id);
bru.setVar("N1N2_CHANNEL_ID", res.body.result.channels[0].channel_id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ meta {
}

post {
url: {{NODE3_RPC_URL}}
url: {{NODE2_RPC_URL}}
body: json
auth: none
}
Expand All @@ -22,7 +22,7 @@ body:json {
"method": "open_channel",
"params": [
{
"peer_id": "{{NODE2_PEERID}}",
"peer_id": "{{NODE3_PEERID}}",
"funding_amount": "0x377aab54d000"
}
]
Expand All @@ -35,6 +35,5 @@ assert {
}

script:post-response {
await new Promise(r => setTimeout(r, 1000));
bru.setVar("N2N3_TEMP_CHANNEL_ID", res.body.result.temporary_channel_id);
await new Promise(r => setTimeout(r, 2000));
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
meta {
name: accept a channel from a peer
name: get auto accepted channel id from Node3
type: http
seq: 4
seq: 8
}

post {
url: {{NODE1_RPC_URL}}
url: {{NODE3_RPC_URL}}
body: json
auth: none
}
Expand All @@ -19,23 +19,23 @@ body:json {
{
"id": "42",
"jsonrpc": "2.0",
"method": "accept_channel",
"method": "list_channels",
"params": [
{
"temporary_channel_id": "{{N1N2_TEMP_CHANNEL_ID}}",
"funding_amount": "0x377aab54d000"
"peer_id": "{{NODE2_PEERID}}"
}
]
}
}

assert {
res.body.error: isUndefined
res.body.result.channel_id: isDefined
res.body.result.channels: isDefined
}

script:post-response {
console.log(res.body.result);
// Sleep for sometime to make sure current operation finishes before next request starts.
await new Promise(r => setTimeout(r, 2000));
bru.setVar("N1N2_CHANNEL_ID", res.body.result.channel_id);
bru.setVar("N2N3_CHANNEL_ID", res.body.result.channels[0].channel_id);
}
5 changes: 2 additions & 3 deletions tests/bruno/e2e/open-use-close-a-channel/02-open-channel.bru
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
meta {
name: open a channel to a peer
name: Node3 open a channel to Node1
type: http
seq: 2
}
Expand Down Expand Up @@ -35,6 +35,5 @@ assert {
}

script:post-response {
await new Promise(r => setTimeout(r, 1000));
bru.setVar("TEMP_CHANNEL_ID", res.body.result.temporary_channel_id);
await new Promise(r => setTimeout(r, 2000));
}
Loading

0 comments on commit 1bcfc96

Please sign in to comment.