diff --git a/src/ckb/channel.rs b/src/ckb/channel.rs index 116ed5f5..32f8e78e 100644 --- a/src/ckb/channel.rs +++ b/src/ckb/channel.rs @@ -236,7 +236,7 @@ impl ChannelActor { version: _, } = state.build_and_sign_commitment_tx()?; debug!( - "Build a funding tx ({:?}) with partial signature {:?}", + "Build a commitment tx ({:?}) with partial signature {:?}", &tx, &signature ); @@ -1823,7 +1823,7 @@ impl ChannelActorState { pub fn create_witness_for_funding_cell( &self, signature: CompactSignature, - version: Option, + version: u64, ) -> [u8; FUNDING_CELL_WITNESS_LEN] { create_witness_for_funding_cell( self.get_funding_lock_script_xonly(), @@ -1836,14 +1836,21 @@ impl ChannelActorState { pub fn aggregate_partial_signatures_to_consume_funding_cell( &self, partial_signatures: [PartialSignature; 2], - version: Option, + version: u64, tx: &TransactionView, ) -> Result { let funding_out_point = self.get_funding_transaction_outpoint(); + debug_assert_eq!( + tx.input_pts_iter().next().as_ref(), + Some(&funding_out_point), + "The first input of the tx must be the funding cell outpoint" + ); + let message = get_funding_cell_message_to_sign(version, funding_out_point, tx); debug!( - "Get message to sign for funding tx {:?}", - hex::encode(message.as_slice()) + "Message to sign to consume funding cell {:?} with version {:?}", + hex::encode(message.as_slice()), + version ); let verify_ctx = Musig2VerifyContext::from(self); @@ -1854,13 +1861,11 @@ impl ChannelActorState { partial_signatures, )?; - let witness = self.create_witness_for_funding_cell(signature, None); - let tx = self - .get_funding_transaction() + let witness = self.create_witness_for_funding_cell(signature, version); + Ok(tx .as_advanced_builder() .set_witnesses(vec![witness.pack()]) - .build(); - Ok(tx) + .build()) } pub fn sign_tx_to_consume_funding_cell( @@ -1868,15 +1873,16 @@ impl ChannelActorState { tx: &PartiallySignedCommitmentTransaction, ) -> Result { debug!( - "Signing and verifying commitment tx with message {:?}", - hex::encode(tx.msg.as_slice()) + "Signing and verifying commitment tx with message {:?} (version {})", + hex::encode(tx.msg.as_slice()), + tx.version ); let sign_ctx = Musig2SignContext::from(self); let signature2 = sign_ctx.sign(tx.msg.as_slice())?; self.aggregate_partial_signatures_to_consume_funding_cell( [tx.signature, signature2], - Some(tx.version), + tx.version, &tx.tx, ) } @@ -1939,7 +1945,7 @@ impl ChannelActorState { self.update_state(ChannelState::Closed); let tx = self.aggregate_partial_signatures_to_consume_funding_cell( [local_shutdown_signature, remote_shutdown_signature], - None, + u64::MAX, &shutdown_tx, )?; @@ -2539,8 +2545,11 @@ impl ChannelActorState { let tx_builder = tx_builder.set_outputs(outputs.to_vec()); let tx_builder = tx_builder.set_outputs_data(vec![Default::default(), Default::default()]); let tx = tx_builder.build(); - let message = - get_funding_cell_message_to_sign(None, self.get_funding_transaction_outpoint(), &tx); + let message = get_funding_cell_message_to_sign( + u64::MAX, + self.get_funding_transaction_outpoint(), + &tx, + ); debug!( "Building message to sign for shutdown transaction {:?}", hex::encode(message.as_slice()) @@ -2572,14 +2581,13 @@ impl ChannelActorState { let tx_builder = tx_builder.set_outputs(outputs); let tx_builder = tx_builder.set_outputs_data(outputs_data); let tx = tx_builder.build(); - let message = get_funding_cell_message_to_sign( - Some(self.get_current_commitment_number(local)), - funding_out_point, - &tx, - ); + let version = self.get_current_commitment_number(local); + let message = get_funding_cell_message_to_sign(version, funding_out_point, &tx); debug!( - "Building commitment transaction message to sign {:?}", - hex::encode(message.as_slice()) + "Building {} commitment transaction message to sign {:?} (version {})", + if local { "local" } else { "remote" }, + hex::encode(message.as_slice()), + version ); (tx, message) } @@ -2774,7 +2782,9 @@ impl ChannelActorState { let (tx, msg) = self.build_commitment_tx(false); debug!( "Verifying partial signature ({:?}) of commitment tx ({:?}) message {:?}", - &signature, &tx, &msg + &signature, + &tx, + hex::encode(&msg) ); verify_ctx.verify(signature, msg.as_slice())?; Ok(PartiallySignedCommitmentTransaction { @@ -2799,7 +2809,9 @@ impl ChannelActorState { let signature = sign_ctx.sign(msg.as_slice())?; debug!( "Signed commitment tx ({:?}) message {:?} with signature {:?}", - &tx, &msg, &signature, + &tx, + hex::encode(&msg), + &signature, ); Ok(PartiallySignedCommitmentTransaction { @@ -2817,6 +2829,10 @@ impl ChannelActorState { signature: PartialSignature, ) -> Result { let tx = self.build_and_verify_commitment_tx(signature)?; + debug!( + "Trying to complete tx with partial remote signature {:?}", + &tx + ); self.sign_tx_to_consume_funding_cell(&tx) } } @@ -2848,10 +2864,9 @@ pub fn create_witness_for_funding_cell( lock_key_xonly: [u8; 32], out_point: OutPoint, signature: CompactSignature, - version: Option, + version: u64, ) -> [u8; FUNDING_CELL_WITNESS_LEN] { let mut witness = Vec::with_capacity(FUNDING_CELL_WITNESS_LEN); - let version = version.unwrap_or(u64::MAX); for bytes in [ version.to_le_bytes().as_ref(), out_point.as_slice(), @@ -2867,7 +2882,7 @@ pub fn create_witness_for_funding_cell( } debug!( - "Building shutdown tx with witness: {:?}", + "Building witnesses for transaction to consume funding cell: {:?}", hex::encode(&witness) ); @@ -2965,12 +2980,12 @@ impl Musig2SignContext { } fn get_funding_cell_message_to_sign( - version: Option, + version: u64, funding_out_point: OutPoint, tx: &TransactionView, ) -> [u8; 32] { - let version = version.unwrap_or(u64::MAX).to_le_bytes(); - let version = version.as_slice(); + let version = version.to_le_bytes(); + let version = version.as_ref(); let funding_out_point = funding_out_point.as_slice(); let tx_hash = tx.hash(); let tx_hash = tx_hash.as_slice(); diff --git a/src/ckb/network.rs b/src/ckb/network.rs index 74fafe23..37efef2c 100644 --- a/src/ckb/network.rs +++ b/src/ckb/network.rs @@ -750,9 +750,15 @@ impl NetworkActorState { ); let transaction = transaction.into_view(); debug!("Trying to broadcast funding transaction {:?}", &transaction); - self.chain_actor - .send_message(CkbChainMessage::SendTx(transaction.clone())) - .expect("chain actor alive"); + + call_t!( + self.chain_actor, + CkbChainMessage::SendTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + transaction.clone() + ) + .expect("chain alive") + .expect("valid funding tx"); let hash = transaction.hash().into(); @@ -1024,9 +1030,15 @@ where "Channel ({:?}) to peer {:?} is already closed. Closing transaction {:?} can be broacasted now.", channel_id, peer_id, tx ); - self.chain_actor - .send_message(CkbChainMessage::SendTx(tx.clone())) - .expect("chain actor alive"); + call_t!( + self.chain_actor, + CkbChainMessage::SendTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + tx.clone() + ) + .expect("chain alive") + .expect("valid closing tx"); + // Notify outside observers. myself .send_message(NetworkActorMessage::new_event( diff --git a/src/ckb_chain/actor.rs b/src/ckb_chain/actor.rs index b8902405..9097d803 100644 --- a/src/ckb_chain/actor.rs +++ b/src/ckb_chain/actor.rs @@ -38,7 +38,7 @@ pub enum CkbChainMessage { RpcReplyPort>, ), Sign(FundingTx, RpcReplyPort>), - SendTx(TransactionView), + SendTx(TransactionView, RpcReplyPort>), TraceTx(TraceTxRequest, RpcReplyPort), } @@ -104,29 +104,37 @@ impl Actor for CkbChainActor { }); } } - SendTx(tx) => { + SendTx(tx, reply_port) => { let rpc_url = state.config.rpc_url.clone(); tokio::task::block_in_place(move || { let ckb_client = CkbRpcClient::new(&rpc_url); - if let Err(err) = ckb_client.send_transaction(tx.data().into(), None) { - //FIXME(yukang): RBF or duplicated transaction handling - match err { - RpcError::Rpc(e) - if (e.code.code() == -1107 || e.code.code() == -1111) => - { - log::warn!( - "[{}] transaction already in pool", - myself.get_name().unwrap_or_default() - ); - } - _ => { - log::error!( - "[{}] send transaction failed: {:?}", - myself.get_name().unwrap_or_default(), - err - ); + let result = match ckb_client.send_transaction(tx.data().into(), None) { + Ok(_) => Ok(()), + Err(err) => { + //FIXME(yukang): RBF or duplicated transaction handling + match err { + RpcError::Rpc(e) + if (e.code.code() == -1107 || e.code.code() == -1111) => + { + log::warn!( + "[{}] transaction already in pool", + myself.get_name().unwrap_or_default() + ); + Ok(()) + } + _ => { + log::error!( + "[{}] send transaction failed: {:?}", + myself.get_name().unwrap_or_default(), + err + ); + Err(err) + } } } + }; + if !reply_port.is_closed() { + reply_port.send(result).expect("reply ok"); } }); } @@ -223,9 +231,10 @@ pub use test_utils::{submit_tx, MockChainActor}; mod test_utils { use std::collections::HashMap; + use anyhow::anyhow; use ckb_types::{ core::TransactionView, - packed::CellOutput, + packed::{CellOutput, OutPoint}, prelude::{Builder, Entity, Pack, PackVec, Unpack}, }; @@ -238,16 +247,25 @@ mod test_utils { use log::{debug, error}; use ractor::{call_t, Actor, ActorProcessingErr, ActorRef}; + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum CellStatus { + // This cell has been consumed. If any transaction + // tries to consume the same cell, it should be rejected. + Consumed, + } + pub struct MockChainActorState { ctx: MockContext, - committed_tx_status: HashMap, + tx_status: HashMap, + cell_status: HashMap, } impl MockChainActorState { pub fn new() -> Self { Self { ctx: MockContext::new(), - committed_tx_status: HashMap::new(), + tx_status: HashMap::new(), + cell_status: HashMap::new(), } } } @@ -366,41 +384,75 @@ mod test_utils { ); } } - SendTx(tx) => { + SendTx(tx, reply_port) => { const MAX_CYCLES: u64 = 100_000_000; let mut context = state.ctx.write(); - let status = match context.verify_tx(&tx, MAX_CYCLES) { - Ok(c) => { - debug!("Verified transaction: {:?} with {} CPU cycles", tx, c); - // Also save the outputs to the context, so that we can refer to - // these out points later. - for outpoint in tx.output_pts().into_iter() { - let index: u32 = outpoint.index().unpack(); - let index = index as usize; - let cell = tx.outputs().get(index).unwrap(); - let data = tx.outputs_data().get(index).unwrap(); - debug!( - "Creating cell with outpoint: {:?}, cell: {:?}, data: {:?}", - outpoint, cell, data - ); - context.create_cell_with_out_point( - outpoint.clone(), - cell, - data.as_bytes(), - ); + let mut f = || { + // Mark the inputs as consumed + for input in tx.input_pts_iter().into_iter() { + match state.cell_status.entry(input.clone()) { + std::collections::hash_map::Entry::Occupied(mut entry) => { + if *entry.get() == CellStatus::Consumed { + return ( + ckb_jsonrpc_types::Status::Rejected, + Err(ckb_sdk::RpcError::Other(anyhow!( + "Cell {:?} already consumed", + &input + ))), + ); + } + *entry.get_mut() = CellStatus::Consumed; + } + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(CellStatus::Consumed); + } } - ckb_jsonrpc_types::Status::Committed } - Err(e) => { - error!("Failed to verify transaction: {:?}, error: {:?}", tx, e); - ckb_jsonrpc_types::Status::Rejected + match context.verify_tx(&tx, MAX_CYCLES) { + Ok(c) => { + debug!("Verified transaction: {:?} with {} CPU cycles", tx, c); + // Also save the outputs to the context, so that we can refer to + // these out points later. + for outpoint in tx.output_pts().into_iter() { + let index: u32 = outpoint.index().unpack(); + let index = index as usize; + let cell = tx.outputs().get(index).unwrap(); + let data = tx.outputs_data().get(index).unwrap(); + debug!( + "Creating cell with outpoint: {:?}, cell: {:?}, data: {:?}", + outpoint, cell, data + ); + context.create_cell_with_out_point( + outpoint.clone(), + cell, + data.as_bytes(), + ); + } + (ckb_jsonrpc_types::Status::Committed, Ok(())) + } + Err(e) => ( + ckb_jsonrpc_types::Status::Rejected, + Err(ckb_sdk::RpcError::Other(anyhow!( + "Failed to verify transaction: {:?}, error: {:?}", + tx, + e + ))), + ), } }; - state.committed_tx_status.insert(tx.hash(), status); + let (status, result) = f(); + state.tx_status.insert(tx.hash(), status); + if let Err(e) = reply_port.send(result.into()) { + error!( + "[{}] send reply failed: {:?}", + myself.get_name().unwrap_or_default(), + e + ); + } } TraceTx(tx, reply_port) => { let status = state - .committed_tx_status + .tx_status .get(&tx.tx_hash) .cloned() .unwrap_or(ckb_jsonrpc_types::Status::Unknown); @@ -408,7 +460,7 @@ mod test_utils { "Tracing transaction: {:?}, status: {:?}", &tx.tx_hash, &status ); - if let Err(e) = reply_port.send(status) { + if let Err(e) = reply_port.send(status.into()) { error!( "[{}] send reply failed: {:?}", myself.get_name().unwrap_or_default(), @@ -427,10 +479,12 @@ mod test_utils { ) -> ckb_jsonrpc_types::Status { pub const TIMEOUT: u64 = 1000; let tx_hash = tx.hash(); - - mock_actor - .send_message(CkbChainMessage::SendTx(tx)) - .expect("chain actor alive"); + if let Err(error) = + call_t!(mock_actor, CkbChainMessage::SendTx, TIMEOUT, tx).expect("chain actor alive") + { + error!("submit tx failed: {:?}", error); + return ckb_jsonrpc_types::Status::Rejected; + } let request = TraceTxRequest { tx_hash, confirmations: 1, @@ -534,6 +588,66 @@ mod test { assert_eq!(submit_tx(actor, tx).await, Status::Committed); } + #[tokio::test] + async fn test_repeatedly_consume_the_same_cell() { + let actor = create_mock_chain_actor().await; + let capacity = 100u64; + let output = CellOutput::new_builder() + .capacity(capacity.pack()) + .lock(get_script_by_contract( + Contract::AlwaysSuccess, + &b"whatever1"[..], + )) + .build(); + let tx = TransactionView::new_advanced_builder() + .output(output) + .output_data(Default::default()) + .build(); + assert_eq!( + submit_tx(actor.clone(), tx.clone()).await, + Status::Committed + ); + let out_point = tx.output_pts_iter().next().unwrap(); + let tx = TransactionView::new_advanced_builder() + .cell_deps(get_cell_deps_by_contracts(vec![Contract::AlwaysSuccess])) + .input( + CellInput::new_builder() + .previous_output(out_point.clone()) + .build(), + ) + .output( + CellOutput::new_builder() + .capacity(capacity.pack()) + .lock(get_script_by_contract( + Contract::FundingLock, + &b"whatever2"[..], + )) + .build(), + ) + .output_data(Default::default()) + .build(); + assert_eq!(submit_tx(actor.clone(), tx).await, Status::Committed); + let tx = TransactionView::new_advanced_builder() + .cell_deps(get_cell_deps_by_contracts(vec![Contract::AlwaysSuccess])) + .input( + CellInput::new_builder() + .previous_output(out_point.clone()) + .build(), + ) + .output( + CellOutput::new_builder() + .capacity(capacity.pack()) + .lock(get_script_by_contract( + Contract::FundingLock, + &b"whatever3"[..], + )) + .build(), + ) + .output_data(Default::default()) + .build(); + assert_eq!(submit_tx(actor, tx).await, Status::Rejected); + } + #[tokio::test] async fn test_submit_malformed_commitment_tx() { let actor = create_mock_chain_actor().await;