diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index e4cfed71..8fde4198 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -63,6 +63,8 @@ const FUNDING_CELL_WITNESS_LEN: usize = 8 + 36 + 32 + 64; // is funded or not. pub const INITIAL_COMMITMENT_NUMBER: u64 = 0; +const ASSUME_NETWORK_ACTOR_ALIVE: &'static str = "network actor must be alive"; + pub enum ChannelActorMessage { /// Command are the messages that are sent to the channel actor to perform some action. /// It is normally generated from a user request. @@ -205,7 +207,7 @@ impl ChannelActor { state.remote_ckb_amount, ), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); Ok(()) } PCNMessage::TxUpdate(tx) => { @@ -240,7 +242,7 @@ impl ChannelActor { ), ), )) - .expect("myself alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } } Ok(()) @@ -276,7 +278,7 @@ impl ChannelActor { state.get_id(), ), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); state.state = ChannelState::AwaitingChannelReady(AwaitingChannelReadyFlags::empty()); @@ -550,7 +552,7 @@ impl ChannelActor { message: PCNMessage::CommitmentSigned(commitment_signed), }), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); match flags { CommitmentSignedFlags::SigningCommitment(flags) => { @@ -599,7 +601,7 @@ impl ChannelActor { .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendPcnMessage(msg), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); // Update the state for this tlc. state.pending_offered_tlcs.insert(tlc.id, tlc); @@ -655,7 +657,7 @@ impl ChannelActor { .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendPcnMessage(msg), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); match command.reason { RemoveTlcReason::RemoveTlcFail(_) => { state.to_remote_amount += tlc.amount; @@ -711,7 +713,7 @@ impl ChannelActor { }), }), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); state.local_shutdown_script = Some(command.close_script.clone()); state.local_shutdown_fee = Some(command.fee); let flags = flags | ShuttingDownFlags::OUR_SHUTDOWN_SENT; @@ -796,7 +798,7 @@ impl ChannelActor { pcn_msg, )), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); state.update_state(ChannelState::CollaboratingFundingTx( CollaboratingFundingTxFlags::AWAITING_REMOTE_TX_COLLABORATION_MSG, @@ -819,7 +821,8 @@ impl ChannelActor { pcn_msg, )), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + state.update_state(ChannelState::CollaboratingFundingTx( flags | CollaboratingFundingTxFlags::OUR_TX_COMPLETE_SENT, )); @@ -868,6 +871,48 @@ impl ChannelActor { ChannelCommand::Shutdown(command) => self.handle_shutdown_command(state, command), } } + + pub fn handle_event( + &self, + myself: &ActorRef, + state: &mut ChannelActorState, + event: ChannelEvent, + ) -> Result<(), ProcessingChannelError> { + match event { + ChannelEvent::FundingTransactionConfirmed => { + let flags = match state.state { + ChannelState::AwaitingChannelReady(flags) => flags, + ChannelState::AwaitingTxSignatures(f) + if f.contains(AwaitingTxSignaturesFlags::TX_SIGNATURES_SENT) => + { + AwaitingChannelReadyFlags::empty() + } + _ => { + panic!("Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state); + } + }; + self.network + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::SendPcnMessage(PCNMessageWithPeerId { + peer_id: state.peer_id.clone(), + message: PCNMessage::ChannelReady(ChannelReady { + channel_id: state.get_id(), + }), + }), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); + let flags = flags | AwaitingChannelReadyFlags::OUR_CHANNEL_READY; + state.update_state(ChannelState::AwaitingChannelReady(flags)); + if flags.contains(AwaitingChannelReadyFlags::CHANNEL_READY) { + state.on_channel_ready(&self.network); + } + } + ChannelEvent::PeerDisconnected => { + myself.stop(Some("PeerDisconnected".to_string())); + } + } + Ok(()) + } } #[rasync_trait] @@ -964,7 +1009,7 @@ where .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendPcnMessage(command), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.network .send_message(NetworkActorMessage::new_event( @@ -974,7 +1019,7 @@ where myself, ), )) - .expect("peer actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); state.update_state(ChannelState::NegotiatingFunding( NegotiatingFundingFlags::INIT_SENT, )); @@ -1050,7 +1095,7 @@ where message, }), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); // TODO: note that we can't actually guarantee that this OpenChannel message is sent here. // It is even possible that the peer_id is bogus, and we can't send a message to it. // We need some book-keeping service to remove all the OUR_INIT_SENT channels. @@ -1079,7 +1124,7 @@ where myself, ), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); tx.send(channel.get_id()).expect("Receive not dropped"); Ok(channel) @@ -1098,7 +1143,7 @@ where myself, ), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); Ok(channel) } } @@ -1121,39 +1166,11 @@ where error!("Error while processing channel command: {:?}", err); } } - ChannelActorMessage::Event(e) => match e { - ChannelEvent::FundingTransactionConfirmed => { - let flags = match state.state { - ChannelState::AwaitingChannelReady(flags) => flags, - ChannelState::AwaitingTxSignatures(f) - if f.contains(AwaitingTxSignaturesFlags::TX_SIGNATURES_SENT) => - { - AwaitingChannelReadyFlags::empty() - } - _ => { - panic!("Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state); - } - }; - self.network - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::SendPcnMessage(PCNMessageWithPeerId { - peer_id: state.peer_id.clone(), - message: PCNMessage::ChannelReady(ChannelReady { - channel_id: state.get_id(), - }), - }), - )) - .expect("network actor alive"); - let flags = flags | AwaitingChannelReadyFlags::OUR_CHANNEL_READY; - state.update_state(ChannelState::AwaitingChannelReady(flags)); - if flags.contains(AwaitingChannelReadyFlags::CHANNEL_READY) { - state.on_channel_ready(&self.network); - } + ChannelActorMessage::Event(e) => { + if let Err(err) = self.handle_event(&myself, state, e) { + error!("Error while processing channel event: {:?}", err); } - ChannelEvent::PeerDisconnected => { - myself.stop(Some("PeerDisconnected".to_string())); - } - }, + } } match state.state { ChannelState::Closed => { @@ -1761,7 +1778,18 @@ impl ChannelActorState { pub fn check_state_for_tlc_update(&self) -> ProcessingChannelResult { match self.state { - ChannelState::ChannelReady(flags) if flags.is_empty() => Ok(()), + ChannelState::ChannelReady(flags) => { + // TODO: Even if we are awaiting remote revoke, we should still stash these tlc updates, + // and perform corresponding actions after we receive the revoke_and_ack message. + if flags.contains(ChannelReadyFlags::AWAITING_REMOTE_REVOKE) { + Err(ProcessingChannelError::InvalidState( + "Trying to update tlc while channel is awaiting remote revocation" + .to_string(), + )) + } else { + Ok(()) + } + } ChannelState::ShuttingDown(_) => Ok(()), _ => Err(ProcessingChannelError::InvalidState(format!( "Invalid state {:?} for adding tlc", @@ -1973,7 +2001,7 @@ impl ChannelActorState { }), }), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); signature }); @@ -1990,7 +2018,7 @@ impl ChannelActorState { .send_message(NetworkActorMessage::new_event( NetworkActorEvent::ChannelClosed(self.get_id(), self.peer_id.clone(), tx), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } None => { @@ -2109,7 +2137,7 @@ impl ChannelActorState { self.get_funding_request(20000), ), )) - .expect("network alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.update_state(ChannelState::CollaboratingFundingTx( CollaboratingFundingTxFlags::PREPARING_LOCAL_TX_COLLABORATION_MSG, )); @@ -2131,7 +2159,7 @@ impl ChannelActorState { ), ), )) - .expect("myself alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } } } @@ -2234,7 +2262,7 @@ impl ChannelActorState { ), ), )) - .expect("myself alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); debug!("Updating peer next local nonce"); self.remote_nonce = Some(commitment_signed.next_local_nonce); @@ -2264,7 +2292,7 @@ impl ChannelActorState { }), }), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); match flags { CommitmentSignedFlags::ChannelReady(_) => { self.update_state(ChannelState::ChannelReady(ChannelReadyFlags::empty())); @@ -2369,7 +2397,7 @@ impl ChannelActorState { partial_witnesses, ), )) - .expect("network alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); let flags = flags | AwaitingTxSignaturesFlags::OUR_TX_SIGNATURES_SENT; self.update_state(ChannelState::AwaitingTxSignatures(flags)); @@ -2388,7 +2416,7 @@ impl ChannelActorState { .send_message(NetworkActorMessage::new_event( NetworkActorEvent::ChannelReady(self.get_id(), self.peer_id.clone()), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); } pub fn append_remote_commitment_point(&mut self, commitment_point: Pubkey) { @@ -2497,7 +2525,7 @@ impl ChannelActorState { }), )), )) - .expect("network actor alive"); + .expect(ASSUME_NETWORK_ACTOR_ALIVE); let old_flags = match self.state { ChannelState::CollaboratingFundingTx(flags) => flags, _ => { diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 5bf3f42a..8046934b 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -54,6 +54,15 @@ pub const PCN_PROTOCOL_ID: ProtocolId = ProtocolId::new(42); pub const DEFAULT_CHAIN_ACTOR_TIMEOUT: u64 = 60000; +// This is a temporary way to document that we assume the chain actor is always alive. +// We may later relax this assumption. At the moment, if the chain actor fails, we +// should panic with this message, and later we may find all references to this message +// to make sure that we handle the case where the chain actor is not alive. +const ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW: &'static str = + "We currently assume that chain actor is always alive, but it failed. This is a known issue."; + +const ASSUME_NETWORK_MYSELF_ALIVE: &'static str = "network actor myself alive"; + #[derive(Debug)] pub struct OpenChannelResponse { pub channel_id: Hash256, @@ -283,19 +292,183 @@ where } } - _ => match state.channels.get(&message.get_channel_id()) { - None => { - error!("Channel not found for message: {:?}", &message); - } - Some(c) => { - c.send_message(ChannelActorMessage::PeerMessage(message)) - .expect("channel actor alive"); - } - }, + _ => state.send_message_to_channel_actor( + message.get_channel_id(), + ChannelActorMessage::PeerMessage(message), + ), }; Ok(()) } + pub async fn handle_event( + &self, + myself: ActorRef, + state: &mut NetworkActorState, + event: NetworkActorEvent, + ) -> crate::Result<()> { + match event { + NetworkActorEvent::NetworkServiceEvent(e) => { + self.on_service_event(e).await; + } + NetworkActorEvent::PeerConnected(id, session) => { + state + .on_peer_connected(&id, &session, self.store.clone()) + .await; + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::PeerConnected( + id, + session.address, + )), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::PeerDisconnected(id, session) => { + state.on_peer_disconnected(&id, &session); + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent( + NetworkServiceEvent::PeerDisConnected(id, session.address), + ), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::ChannelCreated(channel_id, peer_id, actor) => { + state.on_channel_created(channel_id, &peer_id, actor); + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent( + NetworkServiceEvent::ChannelCreated(peer_id, channel_id), + ), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::ChannelAccepted( + peer_id, + new, + old, + local, + remote, + script, + funding_script, + local_ckb_amount, + remote_ckb_amount, + ) => { + assert_ne!(new, old, "new and old channel id must be different"); + if let Some(session) = state.get_peer_session(&peer_id) { + if let Some(channel) = state.channels.remove(&old) { + debug!("Channel accepted: {:?} -> {:?}", old, new); + state.channels.insert(new, channel); + state.session_channels_map.get_mut(&session).map(|set| { + set.remove(&old); + set.insert(new); + }); + + debug!("Starting funding channel"); + // TODO: Here we implies the one who receives AcceptChannel message + // (i.e. the channel initiator) will send TxUpdate message first. + myself + .send_message(NetworkActorMessage::new_command( + NetworkActorCommand::UpdateChannelFunding( + new, + Default::default(), + FundingRequest { + udt_info: funding_script.as_ref().map(|type_script| { + FundingUdtInfo::new( + type_script, + local_ckb_amount, + remote_ckb_amount, + ) + }), + script, + local_amount: local as u64, + local_fee_rate: 0, + remote_amount: remote as u64, + }, + ), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + } + } + NetworkActorEvent::ChannelReady(channel_id, peer_id) => { + info!( + "Channel ({:?}) to peer {:?} is now ready", + channel_id, peer_id + ); + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelReady( + peer_id, channel_id, + )), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::ChannelShutdown(channel_id, peer_id) => { + info!( + "Channel ({:?}) to peer {:?} is being shutdown.", + channel_id, peer_id + ); + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent( + NetworkServiceEvent::ChannelShutDown(peer_id, channel_id), + ), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => { + state.on_channel_closed(&channel_id, &peer_id); + info!( + "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", + channel_id, peer_id, tx + ); + call_t!( + self.chain_actor, + CkbChainMessage::SendTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + tx.clone() + ) + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) + .expect("valid closing tx"); + + // Notify outside observers. + myself + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::NetworkServiceEvent(NetworkServiceEvent::ChannelClosed( + peer_id, channel_id, tx, + )), + )) + .expect(ASSUME_NETWORK_MYSELF_ALIVE); + } + NetworkActorEvent::PeerMessage(peer_id, session, message) => { + self.handle_peer_message(state, peer_id, session, message) + .await? + } + NetworkActorEvent::FundingTransactionPending(transaction, outpoint, channel_id) => { + debug!( + "Funding transaction pending for channel {:?}: {:?}", + channel_id, outpoint + ); + state + .on_funding_transaction_pending(transaction, outpoint.clone(), channel_id) + .await; + } + NetworkActorEvent::FundingTransactionConfirmed(outpoint) => { + state.on_funding_transaction_confirmed(outpoint).await; + } + NetworkActorEvent::FundingTransactionFailed(_outpoint) => { + unimplemented!("handling funding transaction failed"); + } + } + Ok(()) + } + pub async fn handle_command( &self, myself: ActorRef, @@ -446,7 +619,7 @@ where DEFAULT_CHAIN_ACTOR_TIMEOUT, funding_tx.into() ) - .expect("chain alive") + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) .expect("Signing succeeded"); debug!("Funding transaction signed: {:?}", &funding_tx); @@ -491,7 +664,7 @@ where DEFAULT_CHAIN_ACTOR_TIMEOUT, funding_tx.into() ) - .expect("chain alive")?; + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW)?; debug!("Funding transaction signed: {:?}", &funding_tx); let funding_tx = funding_tx.take().expect("take tx"); let witnesses = funding_tx.witnesses(); @@ -774,7 +947,7 @@ impl NetworkActorState { NetworkServiceEvent::ChannelPendingToBeAccepted(peer_id, id), ), )) - .expect("myself alive"); + .expect(ASSUME_NETWORK_MYSELF_ALIVE); Ok(()) } @@ -783,7 +956,8 @@ impl NetworkActorState { transaction: Transaction, outpoint: OutPoint, channel_id: Hash256, - ) -> Result<(), ActorProcessingErr> { + ) { + // Just a sanity check to ensure that no two channels are associated with the same outpoint. if let Some(old) = self.pending_channels.remove(&outpoint) { if old != channel_id { panic!("Trying to associate a new channel id {:?} with the same outpoint {:?} when old channel id is {:?}. Rejecting.", channel_id, outpoint, old); @@ -804,7 +978,7 @@ impl NetworkActorState { DEFAULT_CHAIN_ACTOR_TIMEOUT, transaction.clone() ) - .expect("chain alive") + .expect(ASSUME_CHAIN_ACTOR_ALWAYS_ALIVE_FOR_NOW) .expect("valid funding tx"); let hash = transaction.hash().into(); @@ -853,16 +1027,11 @@ impl NetworkActorState { // Notify outside observers. network .send_message(NetworkActorMessage::new_event(message)) - .expect("myself alive"); + .expect(ASSUME_NETWORK_MYSELF_ALIVE); }); - - Ok(()) } - async fn on_funding_transaction_confirmed( - &mut self, - outpoint: OutPoint, - ) -> Result<(), ActorProcessingErr> { + async fn on_funding_transaction_confirmed(&mut self, outpoint: OutPoint) { let channel_id = match self.pending_channels.remove(&outpoint) { Some(channel_id) => channel_id, None => { @@ -870,17 +1039,27 @@ impl NetworkActorState { "Funding transaction confirmed for outpoint {:?} but no channel found", &outpoint ); - return Ok(()); + return; } }; - if let Some(channel) = self.channels.get(&channel_id) { - channel - .send_message(ChannelActorMessage::Event( - ChannelEvent::FundingTransactionConfirmed, - )) - .expect("channel actor alive"); + self.send_message_to_channel_actor( + channel_id, + ChannelActorMessage::Event(ChannelEvent::FundingTransactionConfirmed), + ); + } + + fn send_message_to_channel_actor(&self, channel_id: Hash256, message: ChannelActorMessage) { + match self.channels.get(&channel_id) { + None => { + error!( + "Failed to send message to channel actor: channel {:?} not found", + &channel_id + ); + } + Some(actor) => { + actor.send_message(message).expect("channel actor alive"); + } } - Ok(()) } } @@ -942,7 +1121,7 @@ where listen_addr, )), )) - .expect("network actor myself alive"); + .expect(ASSUME_NETWORK_MYSELF_ALIVE); tracker.spawn(async move { service.run().await; @@ -974,169 +1153,13 @@ where debug!("Network actor processing message {:?}", message); match message { - NetworkActorMessage::Event(event) => match event { - NetworkActorEvent::NetworkServiceEvent(e) => { - self.on_service_event(e).await; - } - NetworkActorEvent::PeerConnected(id, session) => { - state - .on_peer_connected(&id, &session, self.store.clone()) - .await; - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::PeerConnected(id, session.address), - ), - )) - .expect("myself alive"); - } - NetworkActorEvent::PeerDisconnected(id, session) => { - state.on_peer_disconnected(&id, &session); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::PeerDisConnected(id, session.address), - ), - )) - .expect("myself alive"); - } - NetworkActorEvent::ChannelCreated(channel_id, peer_id, actor) => { - state.on_channel_created(channel_id, &peer_id, actor); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelCreated(peer_id, channel_id), - ), - )) - .expect("myself alive"); - } - NetworkActorEvent::ChannelAccepted( - peer_id, - new, - old, - local, - remote, - script, - funding_script, - local_ckb_amount, - remote_ckb_amount, - ) => { - assert_ne!(new, old, "new and old channel id must be different"); - if let Some(session) = state.get_peer_session(&peer_id) { - if let Some(channel) = state.channels.remove(&old) { - debug!("Channel accepted: {:?} -> {:?}", old, new); - state.channels.insert(new, channel); - state.session_channels_map.get_mut(&session).map(|set| { - set.remove(&old); - set.insert(new); - }); - - debug!("Starting funding channel"); - // TODO: Here we implies the one who receives AcceptChannel message - // (i.e. the channel initiator) will send TxUpdate message first. - dbg!(&script); - myself - .send_message(NetworkActorMessage::new_command( - NetworkActorCommand::UpdateChannelFunding( - new, - Default::default(), - FundingRequest { - udt_info: funding_script.as_ref().map(|type_script| { - FundingUdtInfo::new( - type_script, - local_ckb_amount, - remote_ckb_amount, - ) - }), - script, - local_amount: local as u64, - local_fee_rate: 0, - remote_amount: remote as u64, - }, - ), - )) - .expect("myself alive"); - } - } - } - NetworkActorEvent::ChannelReady(channel_id, peer_id) => { - info!( - "Channel ({:?}) to peer {:?} is now ready", - channel_id, peer_id - ); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelReady(peer_id, channel_id), - ), - )) - .expect("myself alive"); + NetworkActorMessage::Event(event) => { + if let Err(err) = self.handle_event(myself, state, event).await { + error!("Failed to handle ckb network event: {}", err); } - NetworkActorEvent::ChannelShutdown(channel_id, peer_id) => { - info!( - "Channel ({:?}) to peer {:?} is being shutdown.", - channel_id, peer_id - ); - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelShutDown(peer_id, channel_id), - ), - )) - .expect("myself alive"); - } - NetworkActorEvent::ChannelClosed(channel_id, peer_id, tx) => { - state.on_channel_closed(&channel_id, &peer_id); - info!( - "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", - channel_id, peer_id, tx - ); - call_t!( - self.chain_actor, - CkbChainMessage::SendTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - tx.clone() - ) - .expect("chain alive") - .expect("valid closing tx"); - - // Notify outside observers. - myself - .send_message(NetworkActorMessage::new_event( - NetworkActorEvent::NetworkServiceEvent( - NetworkServiceEvent::ChannelClosed(peer_id, channel_id, tx), - ), - )) - .expect("myself alive"); - } - NetworkActorEvent::PeerMessage(peer_id, session, message) => { - self.handle_peer_message(state, peer_id, session, message) - .await? - } - NetworkActorEvent::FundingTransactionPending(transaction, outpoint, channel_id) => { - debug!( - "Funding transaction pending for channel {:?}: {:?}", - channel_id, outpoint - ); - state - .on_funding_transaction_pending(transaction, outpoint.clone(), channel_id) - .await?; - } - NetworkActorEvent::FundingTransactionConfirmed(outpoint) => { - state.on_funding_transaction_confirmed(outpoint).await? - } - NetworkActorEvent::FundingTransactionFailed(_outpoint) => { - unimplemented!("handling funding transaction failed"); - } - }, + } NetworkActorMessage::Command(command) => { - let result = self.handle_command(myself, state, command).await; - if let Err(err) = result { + if let Err(err) = self.handle_command(myself, state, command).await { error!("Failed to handle ckb network command: {}", err); } }