diff --git a/contractcourt/chain_arbitrator.go b/contractcourt/chain_arbitrator.go index 05eb46a68be..b4b2fa26fd4 100644 --- a/contractcourt/chain_arbitrator.go +++ b/contractcourt/chain_arbitrator.go @@ -230,6 +230,12 @@ type ChainArbitratorConfig struct { // AuxResolver is an optional interface that can be used to modify the // way contracts are resolved. AuxResolver fn.Option[lnwallet.AuxContractResolver] + + // ChannelCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + ChannelCloseConfs fn.Option[uint32] } // ChainArbitrator is a sub-system that oversees the on-chain resolution of all @@ -1138,6 +1144,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error extractStateNumHint: lnwallet.GetStateNumHint, auxLeafStore: c.cfg.AuxLeafStore, auxResolver: c.cfg.AuxResolver, + chanCloseConfs: c.cfg.ChannelCloseConfs, }, ) if err != nil { @@ -1315,6 +1322,7 @@ func (c *ChainArbitrator) loadOpenChannels() error { extractStateNumHint: lnwallet.GetStateNumHint, auxLeafStore: c.cfg.AuxLeafStore, auxResolver: c.cfg.AuxResolver, + chanCloseConfs: c.cfg.ChannelCloseConfs, }, ) if err != nil { diff --git a/contractcourt/chain_watcher.go b/contractcourt/chain_watcher.go index 9c566fd6b51..9fd70fe2c93 100644 --- a/contractcourt/chain_watcher.go +++ b/contractcourt/chain_watcher.go @@ -88,6 +88,38 @@ type BreachCloseInfo struct { CloseSummary channeldb.ChannelCloseSummary } +// spendConfirmationState represents the state of spend confirmation tracking +// in the closeObserver state machine. We wait for N confirmations before +// processing any spend to protect against shallow reorgs. +type spendConfirmationState uint8 + +const ( + // spendStateNone indicates no spend has been detected yet. + spendStateNone spendConfirmationState = iota + + // spendStatePending indicates a spend has been detected and we're + // waiting for the required number of confirmations. + spendStatePending + + // spendStateConfirmed indicates the spend has reached the required + // confirmations and has been processed. + spendStateConfirmed +) + +// String returns a human-readable representation of the state. +func (s spendConfirmationState) String() string { + switch s { + case spendStateNone: + return "None" + case spendStatePending: + return "Pending" + case spendStateConfirmed: + return "Confirmed" + default: + return "Unknown" + } +} + // CommitSet is a collection of the set of known valid commitments at a given // instant. If ConfCommitKey is set, then the commitment identified by the // HtlcSetKey has hit the chain. This struct will be used to examine all live @@ -229,6 +261,12 @@ type chainWatcherConfig struct { // auxResolver is used to supplement contract resolution. auxResolver fn.Option[lnwallet.AuxContractResolver] + + // chanCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + chanCloseConfs fn.Option[uint32] } // chainWatcher is a system that's assigned to every active channel. The duty @@ -646,51 +684,245 @@ func newChainSet(chanState *channeldb.OpenChannel) (*chainSet, error) { } // closeObserver is a dedicated goroutine that will watch for any closes of the -// channel that it's watching on chain. In the event of an on-chain event, the -// close observer will assembled the proper materials required to claim the -// funds of the channel on-chain (if required), then dispatch these as -// notifications to all subscribers. +// channel that it's watching on chain. It implements a state machine to handle +// spend detection and confirmation with reorg protection. The states are: +// +// - None (confNtfn == nil): No spend detected yet, waiting for spend +// notification +// +// - Pending (confNtfn != nil): Spend detected, waiting for N confirmations +// +// - Confirmed: Spend confirmed with N blocks, close has been processed +// +// For single-confirmation scenarios (numConfs == 1), we bypass the async state +// machine and immediately dispatch close events upon spend detection. This +// provides synchronous behavior for integration tests which expect immediate +// notifications. For multi-confirmation scenarios (production with numConfs >= 3), +// we use the full async state machine with reorg protection. func (c *chainWatcher) closeObserver() { defer c.wg.Done() - defer c.fundingSpendNtfn.Cancel() + + registerForSpend := func() (*chainntnfs.SpendEvent, error) { + fundingPkScript, err := deriveFundingPkScript(c.cfg.chanState) + if err != nil { + return nil, err + } + + heightHint := c.cfg.chanState.DeriveHeightHint() + + return c.cfg.notifier.RegisterSpendNtfn( + &c.cfg.chanState.FundingOutpoint, + fundingPkScript, + heightHint, + ) + } + + spendNtfn := c.fundingSpendNtfn + defer spendNtfn.Cancel() + + // We use these variables to implement a state machine to track the + // state of the spend confirmation process: + // * When confNtfn is nil, we're in state "None" waiting for a spend. + // * When confNtfn is set, we're in state "Pending" waiting for + // confirmations. + // + // After confirmations, we transition to state "Confirmed" and clean up. + var ( + pendingSpend *chainntnfs.SpendDetail + confNtfn *chainntnfs.ConfirmationEvent + ) log.Infof("Close observer for ChannelPoint(%v) active", c.cfg.chanState.FundingOutpoint) + // handleSpendDetection processes a newly detected spend by registering + // for confirmations. Returns the new confNtfn or error. + handleSpendDetection := func( + spend *chainntnfs.SpendDetail, + ) (*chainntnfs.ConfirmationEvent, error) { + + // If we already have a pending spend, check if it's the same + // transaction. This can happen if both the spend notification + // and blockbeat detect the same spend. + if pendingSpend != nil { + if *pendingSpend.SpenderTxHash == *spend.SpenderTxHash { + log.Debugf("ChannelPoint(%v): ignoring "+ + "duplicate spend detection for tx %v", + c.cfg.chanState.FundingOutpoint, + spend.SpenderTxHash) + return confNtfn, nil + } + + // Different spend detected. Cancel existing confNtfn + // and replace with new one. + log.Warnf("ChannelPoint(%v): detected different "+ + "spend tx %v, replacing pending tx %v", + c.cfg.chanState.FundingOutpoint, + spend.SpenderTxHash, + pendingSpend.SpenderTxHash) + + if confNtfn != nil { + confNtfn.Cancel() + } + } + + numConfs := c.requiredConfsForSpend() + txid := spend.SpenderTxHash + + newConfNtfn, err := c.cfg.notifier.RegisterConfirmationsNtfn( + txid, spend.SpendingTx.TxOut[0].PkScript, + numConfs, uint32(spend.SpendingHeight), + ) + if err != nil { + return nil, fmt.Errorf("register confirmations: %w", + err) + } + + log.Infof("ChannelPoint(%v): waiting for %d confirmations "+ + "of spend tx %v", c.cfg.chanState.FundingOutpoint, + numConfs, txid) + + return newConfNtfn, nil + } + for { + // We only listen to confirmation channels when we have a + // pending spend. By setting these to nil when not needed, Go's + // select ignores those cases, effectively implementing our + // state machine. + var ( + confChan <-chan *chainntnfs.TxConfirmation + negativeConfChan <-chan int32 + ) + if confNtfn != nil { + confChan = confNtfn.Confirmed + negativeConfChan = confNtfn.NegativeConf + } + select { - // A new block is received, we will check whether this block - // contains a spending tx that we are interested in. case beat := <-c.BlockbeatChan: log.Debugf("ChainWatcher(%v) received blockbeat %v", c.cfg.chanState.FundingOutpoint, beat.Height()) - // Process the block. - c.handleBlockbeat(beat) - - // If the funding outpoint is spent, we now go ahead and handle - // it. Note that we cannot rely solely on the `block` event - // above to trigger a close event, as deep down, the receiving - // of block notifications and the receiving of spending - // notifications are done in two different goroutines, so the - // expected order: [receive block -> receive spend] is not - // guaranteed . - case spend, ok := <-c.fundingSpendNtfn.Spend: - // If the channel was closed, then this means that the - // notifier exited, so we will as well. + spend := c.handleBlockbeat(beat) + if spend == nil { + continue + } + + // FAST PATH: Check if we should dispatch immediately for + // single-confirmation scenarios. + if c.handleSpendDispatch(spend, "blockbeat") { + continue + } + + // ASYNC PATH: Multiple confirmations (production). + // STATE TRANSITION: None -> Pending (from blockbeat). + log.Infof("ChannelPoint(%v): detected spend from "+ + "blockbeat, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + spendStatePending) + + newConfNtfn, err := handleSpendDetection(spend) + if err != nil { + log.Errorf("Unable to handle spend "+ + "detection: %v", err) + return + } + pendingSpend = spend + confNtfn = newConfNtfn + + // STATE TRANSITION: None -> Pending. + // We've detected a spend, but don't process it yet. Instead, + // register for confirmations to protect against shallow reorgs. + case spend, ok := <-spendNtfn.Spend: if !ok { return } - err := c.handleCommitSpend(spend) + // FAST PATH: Check if we should dispatch immediately for + // single-confirmation scenarios. + if c.handleSpendDispatch(spend, "spend notification") { + continue + } + + // ASYNC PATH: Multiple confirmations (production). + log.Infof("ChannelPoint(%v): detected spend from "+ + "notification, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + spendStatePending) + + newConfNtfn, err := handleSpendDetection(spend) + if err != nil { + log.Errorf("Unable to handle spend "+ + "detection: %v", err) + return + } + pendingSpend = spend + confNtfn = newConfNtfn + + // STATE TRANSITION: Pending -> Confirmed + // The spend has reached required confirmations. It's now safe + // to process since we've protected against shallow reorgs. + case conf, ok := <-confChan: + if !ok { + log.Errorf("Confirmation channel closed " + + "unexpectedly") + return + } + + log.Infof("ChannelPoint(%v): spend confirmed at "+ + "height %d, transitioning to %v", + c.cfg.chanState.FundingOutpoint, + conf.BlockHeight, spendStateConfirmed) + + err := c.handleCommitSpend(pendingSpend) + if err != nil { + log.Errorf("Failed to handle confirmed "+ + "spend: %v", err) + } + + confNtfn.Cancel() + confNtfn = nil + pendingSpend = nil + + // STATE TRANSITION: Pending -> None + // A reorg removed the spend tx. We reset to initial state and + // wait for ANY new spend (could be the same tx re-mined, or a + // different tx like an RBF replacement). + case reorgDepth, ok := <-negativeConfChan: + if !ok { + log.Errorf("Negative conf channel closed " + + "unexpectedly") + return + } + + log.Infof("ChannelPoint(%v): spend reorged out at "+ + "depth %d, transitioning back to %v", + c.cfg.chanState.FundingOutpoint, reorgDepth, + spendStateNone) + + confNtfn.Cancel() + confNtfn = nil + pendingSpend = nil + + spendNtfn.Cancel() + var err error + spendNtfn, err = registerForSpend() if err != nil { - log.Errorf("Failed to handle commit spend: %v", - err) + log.Errorf("Unable to re-register for "+ + "spend: %v", err) + return } + log.Infof("ChannelPoint(%v): re-registered for spend "+ + "detection", c.cfg.chanState.FundingOutpoint) + // The chainWatcher has been signalled to exit, so we'll do so // now. case <-c.quit: + if confNtfn != nil { + confNtfn.Cancel() + } return } } @@ -986,6 +1218,18 @@ func (c *chainWatcher) toSelfAmount(tx *wire.MsgTx) btcutil.Amount { return btcutil.Amount(fn.Sum(vals)) } +// requiredConfsForSpend determines the number of confirmations required before +// processing a spend of the funding output. Uses config override if set +// (typically for testing), otherwise scales with channel capacity to balance +// security vs user experience for channels of different sizes. +func (c *chainWatcher) requiredConfsForSpend() uint32 { + return c.cfg.chanCloseConfs.UnwrapOrFunc(func() uint32 { + return lnwallet.CloseConfsForCapacity( + c.cfg.chanState.Capacity, + ) + }) +} + // dispatchCooperativeClose processed a detect cooperative channel closure. // We'll use the spending transaction to locate our output within the // transaction, then clean up the database state. We'll also dispatch a @@ -1003,8 +1247,8 @@ func (c *chainWatcher) dispatchCooperativeClose(commitSpend *chainntnfs.SpendDet localAmt := c.toSelfAmount(broadcastTx) // Once this is known, we'll mark the state as fully closed in the - // database. We can do this as a cooperatively closed channel has all - // its outputs resolved after only one confirmation. + // database. For cooperative closes, we wait for a confirmation depth + // determined by channel capacity before dispatching this event. closeSummary := &channeldb.ChannelCloseSummary{ ChanPoint: c.cfg.chanState.FundingOutpoint, ChainHash: c.cfg.chanState.ChainHash, @@ -1358,6 +1602,30 @@ func deriveFundingPkScript(chanState *channeldb.OpenChannel) ([]byte, error) { return fundingPkScript, nil } +// handleSpendDispatch processes a detected spend. For single-confirmation +// scenarios (numConfs == 1), it immediately dispatches the close event and +// returns true. For multi-confirmation scenarios, it returns false, indicating +// the caller should proceed with the async state machine. +func (c *chainWatcher) handleSpendDispatch(spend *chainntnfs.SpendDetail, + source string) bool { + + numConfs := c.requiredConfsForSpend() + if numConfs == 1 { + log.Infof("ChannelPoint(%v): single confirmation mode, "+ + "dispatching immediately from %s", + c.cfg.chanState.FundingOutpoint, source) + + err := c.handleCommitSpend(spend) + if err != nil { + log.Errorf("Failed to handle commit spend: %v", err) + } + + return true + } + + return false +} + // handleCommitSpend takes a spending tx of the funding output and handles the // channel close based on the closure type. func (c *chainWatcher) handleCommitSpend( @@ -1413,9 +1681,10 @@ func (c *chainWatcher) handleCommitSpend( case wire.MaxTxInSequenceNum: fallthrough case mempool.MaxRBFSequence: - // TODO(roasbeef): rare but possible, need itest case for - err := c.dispatchCooperativeClose(commitSpend) - if err != nil { + // This is a cooperative close. Dispatch it directly - the + // confirmation waiting and reorg handling is done in the + // closeObserver state machine before we reach this point. + if err := c.dispatchCooperativeClose(commitSpend); err != nil { return fmt.Errorf("handle coop close: %w", err) } @@ -1520,9 +1789,9 @@ func (c *chainWatcher) chanPointConfirmed() bool { } // handleBlockbeat takes a blockbeat and queries for a spending tx for the -// funding output. If the spending tx is found, it will be handled based on the -// closure type. -func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) { +// funding output. If found, it returns the spend details so closeObserver can +// process it. Returns nil if no spend was detected. +func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) *chainntnfs.SpendDetail { // Notify the chain watcher has processed the block. defer c.NotifyBlockProcessed(beat, nil) @@ -1534,24 +1803,23 @@ func (c *chainWatcher) handleBlockbeat(beat chainio.Blockbeat) { // If the funding output hasn't confirmed in this block, we // will check it again in the next block. if !c.chanPointConfirmed() { - return + return nil } } // Perform a non-blocking read to check whether the funding output was - // spent. + // spent. The actual spend handling is done in closeObserver's state + // machine to avoid blocking the block processing pipeline. spend := c.checkFundingSpend() if spend == nil { log.Tracef("No spend found for ChannelPoint(%v) in block %v", c.cfg.chanState.FundingOutpoint, beat.Height()) - return + return nil } - // The funding output was spent, we now handle it by sending a close - // event to the channel arbitrator. - err := c.handleCommitSpend(spend) - if err != nil { - log.Errorf("Failed to handle commit spend: %v", err) - } + log.Debugf("Detected spend of ChannelPoint(%v) in block %v", + c.cfg.chanState.FundingOutpoint, beat.Height()) + + return spend } diff --git a/contractcourt/chain_watcher_coop_reorg_test.go b/contractcourt/chain_watcher_coop_reorg_test.go new file mode 100644 index 00000000000..9fd9ebc0d86 --- /dev/null +++ b/contractcourt/chain_watcher_coop_reorg_test.go @@ -0,0 +1,325 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/wire" +) + +// TestChainWatcherCoopCloseReorg tests that the chain watcher properly handles +// a reorganization during cooperative close confirmation waiting. When a +// cooperative close transaction is reorganized out, the chain watcher should +// re-register for spend notifications and detect an alternative transaction. +func TestChainWatcherCoopCloseReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create two cooperative close transactions with different fees. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run cooperative close flow with reorg. + closeInfo := harness.runCoopCloseFlow(tx1, true, 2, tx2) + + // Assert that the second transaction was confirmed. + harness.assertCoopCloseTx(closeInfo, tx2) +} + +// TestChainWatcherCoopCloseSameTransactionAfterReorg tests that if the same +// transaction re-confirms after a reorganization, it is properly handled. +func TestChainWatcherCoopCloseSameTransactionAfterReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a single cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Run flow: send tx, trigger reorg, re-send same tx. + harness.sendSpend(tx) + + // Wait for confirmation registration and trigger reorg. + harness.waitForConfRegistration() + harness.mineBlocks(2) + harness.triggerReorg(tx, 2) + + // After reorg, wait for re-registration then re-send the same transaction. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + + // Confirm it. + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + // Wait for and verify cooperative close. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, tx) +} + +// TestChainWatcherCoopCloseMultipleReorgs tests handling of multiple +// consecutive reorganizations during cooperative close confirmation. +func TestChainWatcherCoopCloseMultipleReorgs(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create multiple cooperative close transactions with different fees. + txs := []*wire.MsgTx{ + harness.createCoopCloseTx(5000), + harness.createCoopCloseTx(4950), + harness.createCoopCloseTx(4900), + harness.createCoopCloseTx(4850), + } + + // Define reorg depths for each transition. + reorgDepths := []int32{1, 2, 3} + + // Run multiple reorg flow. + closeInfo := harness.runMultipleReorgFlow(txs, reorgDepths) + + // Assert that the final transaction was confirmed. + harness.assertCoopCloseTx(closeInfo, txs[3]) +} + +// TestChainWatcherCoopCloseDeepReorg tests that the chain watcher can handle +// deep reorganizations where the reorg depth exceeds the required number of +// confirmations. +func TestChainWatcherCoopCloseDeepReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create two cooperative close transactions. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run with a deep reorg (10 blocks). + closeInfo := harness.runCoopCloseFlow(tx1, true, 10, tx2) + + // Assert that the second transaction was confirmed after deep reorg. + harness.assertCoopCloseTx(closeInfo, tx2) +} + +// TestChainWatcherCoopCloseReorgNoAlternative tests that if a cooperative +// close is reorganized out and no alternative transaction appears, the +// chain watcher continues waiting. +func TestChainWatcherCoopCloseReorgNoAlternative(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Send spend and wait for confirmation registration. + harness.sendSpend(tx) + harness.waitForConfRegistration() + + // Trigger reorg after some confirmations. + harness.mineBlocks(2) + harness.triggerReorg(tx, 2) + + // Assert no cooperative close event is received. + harness.assertNoCoopClose(2 * time.Second) + + // Now send a new transaction after the timeout. + harness.waitForSpendRegistration() + newTx := harness.createCoopCloseTx(4900) + harness.sendSpend(newTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(newTx, harness.currentHeight) + + // Should receive cooperative close for the new transaction. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, newTx) +} + +// TestChainWatcherCoopCloseRBFAfterReorg tests that RBF cooperative close +// transactions are properly handled when reorganizations occur. +func TestChainWatcherCoopCloseRBFAfterReorg(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a series of RBF transactions with increasing fees. + rbfTxs := make([]*wire.MsgTx, 3) + for i := range rbfTxs { + // Each transaction has higher fee (lower output). + outputValue := int64(5000 - i*100) + rbfTxs[i] = harness.createCoopCloseTx(outputValue) + } + + // Send first RBF transaction. + harness.sendSpend(rbfTxs[0]) + harness.waitForConfRegistration() + + // Trigger reorg. + harness.mineBlocks(1) + harness.triggerReorg(rbfTxs[0], 1) + + // Send second RBF transaction after re-registration. + harness.waitForSpendRegistration() + harness.sendSpend(rbfTxs[1]) + harness.waitForConfRegistration() + + // Another reorg. + harness.mineBlocks(1) + harness.triggerReorg(rbfTxs[1], 1) + + // Send final RBF transaction after re-registration. + harness.waitForSpendRegistration() + harness.sendSpend(rbfTxs[2]) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(rbfTxs[2], harness.currentHeight) + + // Should confirm the highest fee transaction. + closeInfo := harness.waitForCoopClose(5 * time.Second) + harness.assertCoopCloseTx(closeInfo, rbfTxs[2]) +} + +// TestChainWatcherCoopCloseScaledConfirmationsWithReorg tests that scaled +// confirmations (based on channel capacity) work correctly with reorgs. +func TestChainWatcherCoopCloseScaledConfirmationsWithReorg(t *testing.T) { + t.Parallel() + + // Test with different channel capacities that require different + // confirmation counts. + testCases := []struct { + name string + capacityScale float64 + expectedConfs uint16 + reorgDepth int32 + }{ + { + name: "small_channel", + capacityScale: 0.1, + expectedConfs: 1, + reorgDepth: 1, + }, + { + name: "medium_channel", + capacityScale: 0.5, + expectedConfs: 3, + reorgDepth: 2, + }, + { + name: "large_channel", + capacityScale: 1.0, + expectedConfs: 6, + reorgDepth: 4, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + // Create harness with specific channel capacity. + harness := newChainWatcherTestHarness(t) + + // Create transactions. + tx1 := harness.createCoopCloseTx(5000) + tx2 := harness.createCoopCloseTx(4900) + + // Run with reorg at different depths based on capacity. + closeInfo := harness.runCoopCloseFlow( + tx1, true, tc.reorgDepth, tx2, + ) + + // Verify correct transaction confirmed. + harness.assertCoopCloseTx(closeInfo, tx2) + }) + } +} + +// TestChainWatcherCoopCloseReorgRaceCondition tests that the chain watcher +// correctly handles race conditions where multiple transactions might be +// in flight during reorganizations. +func TestChainWatcherCoopCloseReorgRaceCondition(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create multiple transactions. + txs := make([]*wire.MsgTx, 5) + for i := range txs { + txs[i] = harness.createCoopCloseTx(int64(5000 - i*50)) + } + + // Rapidly send multiple transactions and reorgs. + for i := 0; i < 3; i++ { + harness.sendSpend(txs[i]) + harness.waitForConfRegistration() + harness.mineBlocks(1) + + // Quick reorg. + harness.triggerReorg(txs[i], 1) + + // Wait for re-registration. + harness.waitForSpendRegistration() + } + + // Eventually send and confirm a final transaction. + finalTx := txs[4] + harness.sendSpend(finalTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(finalTx, harness.currentHeight) + + // Should eventually settle on the final transaction. + closeInfo := harness.waitForCoopClose(10 * time.Second) + harness.assertCoopCloseTx(closeInfo, finalTx) +} + +// TestChainWatcherCoopCloseReorgErrorHandling tests that errors during +// re-registration after reorg are properly handled. +func TestChainWatcherCoopCloseReorgErrorHandling(t *testing.T) { + t.Parallel() + + // Create test harness. + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close transaction. + tx := harness.createCoopCloseTx(5000) + + // Send spend notification. + harness.sendSpend(tx) + + // Trigger multiple rapid reorgs to stress error handling. + for i := 0; i < 5; i++ { + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, int32(i+1)) + if i < 4 { + // Re-register for spend after each reorg except the + // last. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + } + } + + // After stress, send a clean transaction. + harness.waitForSpendRegistration() + cleanTx := harness.createCoopCloseTx(4800) + harness.sendSpend(cleanTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(cleanTx, harness.currentHeight) + + // Should still receive the cooperative close. + closeInfo := harness.waitForCoopClose(10 * time.Second) + harness.assertCoopCloseTx(closeInfo, cleanTx) +} diff --git a/contractcourt/chain_watcher_reorg_test.go b/contractcourt/chain_watcher_reorg_test.go new file mode 100644 index 00000000000..34fb71ed9b8 --- /dev/null +++ b/contractcourt/chain_watcher_reorg_test.go @@ -0,0 +1,388 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/wire" + "pgregory.net/rapid" +) + +// closeType represents the type of channel close for testing purposes. +type closeType int + +const ( + // closeTypeCoop represents a cooperative channel close. + closeTypeCoop closeType = iota + + // closeTypeRemoteUnilateral represents a remote unilateral close + // (remote party broadcasting their commitment). + closeTypeRemoteUnilateral + + // closeTypeLocalForce represents a local force close (us broadcasting + // our commitment). + closeTypeLocalForce + + // closeTypeBreach represents a breach (remote party broadcasting a + // revoked commitment). + closeTypeBreach +) + +// String returns a string representation of the close type. +func (c closeType) String() string { + switch c { + case closeTypeCoop: + return "cooperative" + case closeTypeRemoteUnilateral: + return "remote_unilateral" + case closeTypeLocalForce: + return "local_force" + case closeTypeBreach: + return "breach" + default: + return "unknown" + } +} + +// createCloseTx creates a close transaction of the specified type using the +// harness. +func createCloseTx(h *chainWatcherTestHarness, ct closeType, + outputValue int64) *wire.MsgTx { + + switch ct { + case closeTypeCoop: + return h.createCoopCloseTx(outputValue) + case closeTypeRemoteUnilateral: + return h.createRemoteUnilateralCloseTx() + case closeTypeLocalForce: + return h.createLocalForceCloseTx() + case closeTypeBreach: + return h.createBreachCloseTx() + default: + h.t.Fatalf("unknown close type: %v", ct) + return nil + } +} + +// waitForCloseEvent waits for the appropriate close event based on close type. +func waitForCloseEvent(h *chainWatcherTestHarness, ct closeType, + timeout time.Duration) any { + + switch ct { + case closeTypeCoop: + return h.waitForCoopClose(timeout) + case closeTypeRemoteUnilateral: + return h.waitForRemoteUnilateralClose(timeout) + case closeTypeLocalForce: + return h.waitForLocalUnilateralClose(timeout) + case closeTypeBreach: + return h.waitForBreach(timeout) + default: + h.t.Fatalf("unknown close type: %v", ct) + return nil + } +} + +// assertCloseEventTx asserts that the close event matches the expected +// transaction based on close type. +func assertCloseEventTx(h *chainWatcherTestHarness, ct closeType, + event any, expectedTx *wire.MsgTx) { + + switch ct { + case closeTypeCoop: + h.assertCoopCloseTx(event.(*CooperativeCloseInfo), expectedTx) + + case closeTypeRemoteUnilateral: + h.assertRemoteUnilateralCloseTx( + event.(*RemoteUnilateralCloseInfo), expectedTx, + ) + + case closeTypeLocalForce: + h.assertLocalUnilateralCloseTx( + event.(*LocalUnilateralCloseInfo), expectedTx, + ) + + case closeTypeBreach: + h.assertBreachTx(event.(*BreachCloseInfo), expectedTx) + + default: + h.t.Fatalf("unknown close type: %v", ct) + } +} + +// generateAltTxsForReorgs generates alternative transactions for reorg +// scenarios. For commitment-based closes (breach, remote/local force), the same +// tx is reused since we can only have one commitment tx per channel state. For +// coop closes, new transactions with different output values are created. +func generateAltTxsForReorgs(h *chainWatcherTestHarness, ct closeType, + originalTx *wire.MsgTx, numReorgs int, sameTxAtEnd bool) []*wire.MsgTx { + + altTxs := make([]*wire.MsgTx, numReorgs) + + for i := 0; i < numReorgs; i++ { + switch ct { + case closeTypeBreach, closeTypeRemoteUnilateral, + closeTypeLocalForce: + + // Non-coop closes can only have one commitment tx, so + // all reorgs use the same transaction. + altTxs[i] = originalTx + + case closeTypeCoop: + if i == numReorgs-1 && sameTxAtEnd { + // Last reorg goes back to original transaction. + altTxs[i] = originalTx + } else { + // Create different coop close tx with different + // output value to make it unique. + outputValue := int64(5000 - (i+1)*100) + altTxs[i] = createCloseTx(h, ct, outputValue) + } + } + } + + return altTxs +} + +// testReorgProperties is the main property-based test for reorg handling +// across all close types. +// +// The testingT parameter is captured from the outer test function and used +// for operations that require *testing.T (like channel creation), while the +// rapid.T is used for all test reporting and property generation. +func testReorgProperties(testingT *testing.T) func(*rapid.T) { + return func(t *rapid.T) { + // Generate random close type. + allCloseTypes := []closeType{ + closeTypeCoop, + closeTypeRemoteUnilateral, + closeTypeLocalForce, + closeTypeBreach, + } + ct := rapid.SampledFrom(allCloseTypes).Draw(t, "closeType") + + // Generate random number of required confirmations (2-6). We + // use at least 2 so we have room for reorgs during + // confirmation. + requiredConfs := rapid.IntRange(2, 6).Draw(t, "requiredConfs") + + // Generate number of reorgs (1-3 to keep test runtime + // reasonable). + numReorgs := rapid.IntRange(1, 3).Draw(t, "numReorgs") + + // Generate whether the final transaction is the same as the + // original. + sameTxAtEnd := rapid.Bool().Draw(t, "sameTxAtEnd") + + // Log test parameters for debugging. + t.Logf("Testing %s close with %d confs, %d reorgs, "+ + "sameTxAtEnd=%v", + ct, requiredConfs, numReorgs, sameTxAtEnd) + + // Create test harness using both the concrete *testing.T for + // channel creation and the rapid.T for test reporting. + harness := newChainWatcherTestHarnessFromReporter( + testingT, t, withRequiredConfs(uint32(requiredConfs)), + ) + + // Create initial transaction. + tx1 := createCloseTx(harness, ct, 5000) + + // Generate alternative transactions for each reorg. + altTxs := generateAltTxsForReorgs( + harness, ct, tx1, numReorgs, sameTxAtEnd, + ) + + // Send the initial spend. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + + // Execute the set of re-orgs, based on our random sample, we'll + // mine N blocks, do a re-org of size N, then wait for + // detection, and repeat. + for i := 0; i < numReorgs; i++ { + // Generate random reorg depth (1 to requiredConfs-1). + // We cap it to avoid reorging too far back. + reorgDepth := rapid.IntRange( + 1, requiredConfs-1, + ).Draw(t, "reorgDepth") + + // Mine some blocks (but less than required confs). + blocksToMine := rapid.IntRange( + 1, requiredConfs-1, + ).Draw(t, "blocksToMine") + harness.mineBlocks(int32(blocksToMine)) + + // Trigger reorg. + if i == 0 { + harness.triggerReorg( + tx1, int32(reorgDepth), + ) + } else { + harness.triggerReorg( + altTxs[i-1], int32(reorgDepth), + ) + } + + harness.waitForSpendRegistration() + + harness.sendSpend(altTxs[i]) + harness.waitForConfRegistration() + } + + // Mine enough blocks to confirm final transaction. + harness.mineBlocks(1) + finalTx := altTxs[numReorgs-1] + harness.confirmTx(finalTx, harness.currentHeight) + + // Wait for and verify close event. + event := waitForCloseEvent(harness, ct, 10*time.Second) + assertCloseEventTx(harness, ct, event, finalTx) + } +} + +// TestChainWatcherReorgAllCloseTypes runs property-based tests for reorg +// handling across all channel close types. It generates random combinations of: +// - Close type (coop, remote unilateral, local force, breach) +// - Number of confirmations required (2-6) +// - Number of reorgs (1-3) +// - Whether the final tx is same as original or different +func TestChainWatcherReorgAllCloseTypes(t *testing.T) { + t.Parallel() + + rapid.Check(t, testReorgProperties(t)) +} + +// TestRemoteUnilateralCloseWithSingleReorg tests that a remote unilateral +// close is properly handled when a single reorg occurs during confirmation. +func TestRemoteUnilateralCloseWithSingleReorg(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create two remote unilateral close transactions. + // Since these are commitment transactions, we can only have one per + // state, so we'll use the current one as tx1. + tx1 := harness.createRemoteUnilateralCloseTx() + + // Advance channel state to get a different commitment. + _ = harness.createBreachCloseTx() + tx2 := harness.createRemoteUnilateralCloseTx() + + // Send initial spend. + harness.sendSpend(tx1) + harness.waitForConfRegistration() + + // Mine a block and trigger reorg. + harness.mineBlocks(1) + harness.triggerReorg(tx1, 1) + + // Send alternative transaction after reorg. + harness.waitForSpendRegistration() + harness.sendSpend(tx2) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx2, harness.currentHeight) + + // Verify correct event. + closeInfo := harness.waitForRemoteUnilateralClose(5 * time.Second) + harness.assertRemoteUnilateralCloseTx(closeInfo, tx2) +} + +// TestLocalForceCloseWithMultipleReorgs tests that a local force close is +// properly handled through multiple consecutive reorgs. +func TestLocalForceCloseWithMultipleReorgs(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // For local force close, we can only broadcast our current commitment. + // We'll simulate multiple reorgs where the same tx keeps getting + // reorganized out and re-broadcast. + tx := harness.createLocalForceCloseTx() + + // First spend and reorg. + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, 1) + + // Second spend and reorg. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.triggerReorg(tx, 1) + + // Third spend - this one confirms. + harness.waitForSpendRegistration() + harness.sendSpend(tx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(tx, harness.currentHeight) + + // Verify correct event. + closeInfo := harness.waitForLocalUnilateralClose(5 * time.Second) + harness.assertLocalUnilateralCloseTx(closeInfo, tx) +} + +// TestBreachCloseWithDeepReorg tests that a breach (revoked commitment) is +// properly detected after a deep reorganization. +func TestBreachCloseWithDeepReorg(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create a revoked commitment transaction. + revokedTx := harness.createBreachCloseTx() + + // Send spend and wait for confirmation registration. + harness.sendSpend(revokedTx) + harness.waitForConfRegistration() + + // Mine several blocks and then trigger a deep reorg. + harness.mineBlocks(5) + harness.triggerReorg(revokedTx, 5) + + // Re-broadcast same transaction after reorg. + harness.waitForSpendRegistration() + harness.sendSpend(revokedTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(revokedTx, harness.currentHeight) + + // Verify breach detection. + breachInfo := harness.waitForBreach(5 * time.Second) + harness.assertBreachTx(breachInfo, revokedTx) +} + +// TestCoopCloseReorgToForceClose tests the edge case where a cooperative +// close gets reorged out and is replaced by a force close. +func TestCoopCloseReorgToForceClose(t *testing.T) { + t.Parallel() + + harness := newChainWatcherTestHarness(t) + + // Create a cooperative close and a force close transaction. + coopTx := harness.createCoopCloseTx(5000) + forceTx := harness.createRemoteUnilateralCloseTx() + + // Send cooperative close. + harness.sendSpend(coopTx) + harness.waitForConfRegistration() + + // Trigger reorg that removes coop close. + harness.mineBlocks(1) + harness.triggerReorg(coopTx, 1) + + // Send force close as alternative. + harness.waitForSpendRegistration() + harness.sendSpend(forceTx) + harness.waitForConfRegistration() + harness.mineBlocks(1) + harness.confirmTx(forceTx, harness.currentHeight) + + // Should receive remote unilateral close event, not coop close. + closeInfo := harness.waitForRemoteUnilateralClose(5 * time.Second) + harness.assertRemoteUnilateralCloseTx(closeInfo, forceTx) +} diff --git a/contractcourt/chain_watcher_test.go b/contractcourt/chain_watcher_test.go index 2dc3605d394..8275886a140 100644 --- a/contractcourt/chain_watcher_test.go +++ b/contractcourt/chain_watcher_test.go @@ -12,6 +12,7 @@ import ( "github.com/lightningnetwork/lnd/chainio" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/input" lnmock "github.com/lightningnetwork/lnd/lntest/mock" "github.com/lightningnetwork/lnd/lnwallet" @@ -34,16 +35,19 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail, 1), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail, 1), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChannel.State(), notifier: aliceNotifier, signer: aliceChannel.Signer, extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: fn.Some(uint32(1)), }) require.NoError(t, err, "unable to create chain watcher") err = aliceChainWatcher.Start() @@ -90,6 +94,10 @@ func TestChainWatcherRemoteUnilateralClose(t *testing.T) { t.Fatalf("unable to send blockbeat") } + // With chanCloseConfs set to 1, the fast-path dispatches immediately + // without confirmation registration. The close event should arrive + // directly after processing the blockbeat. + // We should get a new spend event over the remote unilateral close // event channel. var uniClose *RemoteUnilateralCloseInfo @@ -144,16 +152,19 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { // With the channels created, we'll now create a chain watcher instance // which will be watching for any closes of Alice's channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChannel.State(), notifier: aliceNotifier, signer: aliceChannel.Signer, extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: fn.Some(uint32(1)), }) require.NoError(t, err, "unable to create chain watcher") if err := aliceChainWatcher.Start(); err != nil { @@ -219,6 +230,10 @@ func TestChainWatcherRemoteUnilateralClosePendingCommit(t *testing.T) { t.Fatalf("unable to send blockbeat") } + // With chanCloseConfs set to 1, the fast-path dispatches immediately + // without confirmation registration. The close event should arrive + // directly after processing the blockbeat. + // We should get a new spend event over the remote unilateral close // event channel. var uniClose *RemoteUnilateralCloseInfo @@ -331,10 +346,12 @@ func TestChainWatcherDataLossProtect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChanState, @@ -407,6 +424,8 @@ func TestChainWatcherDataLossProtect(t *testing.T) { t.Fatalf("unable to send blockbeat") } + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a new uni close resolution that indicates we // processed the DLP scenario. var uniClose *RemoteUnilateralCloseInfo @@ -532,10 +551,12 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { // With the channels created, we'll now create a chain watcher // instance which will be watching for any closes of Alice's // channel. + confRegistered := make(chan struct{}, 1) aliceNotifier := &lnmock.ChainNotifier{ - SpendChan: make(chan *chainntnfs.SpendDetail), - EpochChan: make(chan *chainntnfs.BlockEpoch), - ConfChan: make(chan *chainntnfs.TxConfirmation), + SpendChan: make(chan *chainntnfs.SpendDetail), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation), + ConfRegistered: confRegistered, } aliceChainWatcher, err := newChainWatcher(chainWatcherConfig{ chanState: aliceChanState, @@ -604,6 +625,8 @@ func TestChainWatcherLocalForceCloseDetect(t *testing.T) { t.Fatalf("unable to send blockbeat") } + aliceNotifier.WaitForConfRegistrationAndSend(t) + // We should get a local force close event from Alice as she // should be able to detect the close based on the commitment // outputs. diff --git a/contractcourt/chain_watcher_test_harness.go b/contractcourt/chain_watcher_test_harness.go new file mode 100644 index 00000000000..0b7c0c97414 --- /dev/null +++ b/contractcourt/chain_watcher_test_harness.go @@ -0,0 +1,693 @@ +package contractcourt + +import ( + "testing" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightningnetwork/lnd/chainio" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/fn/v2" + lnmock "github.com/lightningnetwork/lnd/lntest/mock" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/mock" +) + +// testReporter is a minimal interface for test reporting that is satisfied by +// both *testing.T and *rapid.T, allowing the harness to work with property-based +// tests. +type testReporter interface { + Helper() + Fatalf(format string, args ...any) +} + +// chainWatcherTestHarness provides a test harness for chain watcher tests +// with utilities for simulating spends, confirmations, and reorganizations. +type chainWatcherTestHarness struct { + t testReporter + + // aliceChannel and bobChannel are the test channels. + aliceChannel *lnwallet.LightningChannel + bobChannel *lnwallet.LightningChannel + + // chainWatcher is the chain watcher under test. + chainWatcher *chainWatcher + + // notifier is the mock chain notifier. + notifier *mockChainNotifier + + // chanEvents is the channel event subscription. + chanEvents *ChainEventSubscription + + // currentHeight tracks the current block height. + currentHeight int32 + + // blockbeatProcessed is a channel that signals when a blockbeat has been processed. + blockbeatProcessed chan struct{} +} + +// mockChainNotifier extends the standard mock with additional channels for +// testing cooperative close reorgs. +type mockChainNotifier struct { + *lnmock.ChainNotifier + + // confEvents tracks active confirmation event subscriptions. + confEvents []*mockConfirmationEvent + + // confRegistered is a channel that signals when a new confirmation + // event has been registered. + confRegistered chan struct{} + + // spendEvents tracks active spend event subscriptions. + spendEvents []*chainntnfs.SpendEvent + + // spendRegistered is a channel that signals when a new spend + // event has been registered. + spendRegistered chan struct{} +} + +// mockConfirmationEvent represents a mock confirmation event subscription. +type mockConfirmationEvent struct { + txid chainhash.Hash + numConfs uint32 + confirmedChan chan *chainntnfs.TxConfirmation + negConfChan chan int32 + cancelled bool +} + +// RegisterSpendNtfn creates a new mock spend event. +func (m *mockChainNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, + pkScript []byte, heightHint uint32) (*chainntnfs.SpendEvent, error) { + + // The base mock already has SpendChan, use that. + spendEvent := &chainntnfs.SpendEvent{ + Spend: m.SpendChan, + Cancel: func() { + // No-op for now. + }, + } + + m.spendEvents = append(m.spendEvents, spendEvent) + + // Signal that a new spend event has been registered. + select { + case m.spendRegistered <- struct{}{}: + default: + } + + return spendEvent, nil +} + +// RegisterConfirmationsNtfn creates a new mock confirmation event. +func (m *mockChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + pkScript []byte, numConfs, heightHint uint32, + opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) { + + mockEvent := &mockConfirmationEvent{ + txid: *txid, + numConfs: numConfs, + confirmedChan: make(chan *chainntnfs.TxConfirmation, 1), + negConfChan: make(chan int32, 1), + } + + m.confEvents = append(m.confEvents, mockEvent) + + // Signal that a new confirmation event has been registered. + select { + case m.confRegistered <- struct{}{}: + default: + } + + return &chainntnfs.ConfirmationEvent{ + Confirmed: mockEvent.confirmedChan, + NegativeConf: mockEvent.negConfChan, + Cancel: func() { + mockEvent.cancelled = true + }, + }, nil +} + +// harnessOpt is a functional option for configuring the test harness. +type harnessOpt func(*harnessConfig) + +// harnessConfig holds configuration for the test harness. +type harnessConfig struct { + requiredConfs fn.Option[uint32] +} + +// withRequiredConfs sets the number of confirmations required for channel closes. +func withRequiredConfs(confs uint32) harnessOpt { + return func(cfg *harnessConfig) { + cfg.requiredConfs = fn.Some(confs) + } +} + +// newChainWatcherTestHarness creates a new test harness for chain watcher tests. +func newChainWatcherTestHarness(t *testing.T, opts ...harnessOpt) *chainWatcherTestHarness { + return newChainWatcherTestHarnessFromReporter(t, t, opts...) +} + +// newChainWatcherTestHarnessFromReporter creates a test harness that works with +// both *testing.T and *rapid.T. The testingT parameter is used for operations +// that specifically require *testing.T (like CreateTestChannels), while reporter +// is used for all test reporting (Helper, Fatalf). +func newChainWatcherTestHarnessFromReporter(testingT *testing.T, + reporter testReporter, opts ...harnessOpt) *chainWatcherTestHarness { + + reporter.Helper() + + // Apply options. + cfg := &harnessConfig{ + requiredConfs: fn.None[uint32](), + } + for _, opt := range opts { + opt(cfg) + } + + // Create test channels. + aliceChannel, bobChannel, err := lnwallet.CreateTestChannels( + testingT, channeldb.SingleFunderTweaklessBit, + ) + if err != nil { + reporter.Fatalf("unable to create test channels: %v", err) + } + + // Create mock notifier. + baseNotifier := &lnmock.ChainNotifier{ + SpendChan: make(chan *chainntnfs.SpendDetail, 1), + EpochChan: make(chan *chainntnfs.BlockEpoch), + ConfChan: make(chan *chainntnfs.TxConfirmation, 1), + } + + notifier := &mockChainNotifier{ + ChainNotifier: baseNotifier, + confEvents: make([]*mockConfirmationEvent, 0), + confRegistered: make(chan struct{}, 10), + spendEvents: make([]*chainntnfs.SpendEvent, 0), + spendRegistered: make(chan struct{}, 10), + } + + // Create chain watcher. + chainWatcher, err := newChainWatcher(chainWatcherConfig{ + chanState: aliceChannel.State(), + notifier: notifier, + signer: aliceChannel.Signer, + extractStateNumHint: lnwallet.GetStateNumHint, + chanCloseConfs: cfg.requiredConfs, + contractBreach: func(retInfo *lnwallet.BreachRetribution) error { + // In tests, we just need to accept the breach notification. + // The actual breach handling is tested elsewhere. + return nil + }, + }) + if err != nil { + reporter.Fatalf("unable to create chain watcher: %v", err) + } + + // Start chain watcher (this will register for spend notification). + err = chainWatcher.Start() + if err != nil { + reporter.Fatalf("unable to start chain watcher: %v", err) + } + + // Subscribe to channel events. + chanEvents := chainWatcher.SubscribeChannelEvents() + + harness := &chainWatcherTestHarness{ + t: reporter, + aliceChannel: aliceChannel, + bobChannel: bobChannel, + chainWatcher: chainWatcher, + notifier: notifier, + chanEvents: chanEvents, + currentHeight: 100, + blockbeatProcessed: make(chan struct{}), + } + + // Wait for the initial spend registration that happens in Start(). + harness.waitForSpendRegistration() + + // Verify BlockbeatChan is initialized. + if chainWatcher.BlockbeatChan == nil { + reporter.Fatalf("BlockbeatChan is nil after initialization") + } + + // Register cleanup. We use the testingT for Cleanup since rapid.T + // may not have this method in the same way. + testingT.Cleanup(func() { + chainWatcher.Stop() + }) + + return harness +} + +// createCoopCloseTx creates a cooperative close transaction with the given +// output value. The transaction will have the proper sequence number to +// indicate it's a cooperative close. +func (h *chainWatcherTestHarness) createCoopCloseTx(outputValue int64) *wire.MsgTx { + return &wire.MsgTx{ + TxIn: []*wire.TxIn{{ + PreviousOutPoint: h.aliceChannel.State().FundingOutpoint, + Sequence: wire.MaxTxInSequenceNum, + }}, + TxOut: []*wire.TxOut{{ + Value: outputValue, + PkScript: []byte{byte(outputValue % 255)}, // Unique script. + }}, + } +} + +// createRemoteUnilateralCloseTx creates a remote unilateral close transaction. +// From Alice's perspective, this is Bob's local commitment transaction. +func (h *chainWatcherTestHarness) createRemoteUnilateralCloseTx() *wire.MsgTx { + return h.bobChannel.State().LocalCommitment.CommitTx +} + +// createLocalForceCloseTx creates a local force close transaction. +// This is Alice's local commitment transaction. +func (h *chainWatcherTestHarness) createLocalForceCloseTx() *wire.MsgTx { + return h.aliceChannel.State().LocalCommitment.CommitTx +} + +// createBreachCloseTx creates a breach (revoked commitment) transaction. +// We advance the channel state, save the commitment, then advance again +// to revoke it. Returns the revoked commitment tx. +func (h *chainWatcherTestHarness) createBreachCloseTx() *wire.MsgTx { + h.t.Helper() + + // To create a revoked commitment, we need to advance the channel state + // at least once. We'll use the test utils helper to add an HTLC and + // force a state transition. + + // Get the current commitment before we advance (this will be revoked). + revokedCommit := h.bobChannel.State().LocalCommitment.CommitTx + + // Add a fake HTLC to advance state. + htlcAmount := lnwire.NewMSatFromSatoshis(10000) + paymentHash := [32]byte{4, 5, 6} + htlc := &lnwire.UpdateAddHTLC{ + ID: 0, + Amount: htlcAmount, + Expiry: uint32(h.currentHeight + 100), + PaymentHash: paymentHash, + } + + // Add HTLC to both channels. + if _, err := h.aliceChannel.AddHTLC(htlc, nil); err != nil { + h.t.Fatalf("unable to add HTLC to alice: %v", err) + } + if _, err := h.bobChannel.ReceiveHTLC(htlc); err != nil { + h.t.Fatalf("unable to add HTLC to bob: %v", err) + } + + // Force state transition using the helper. + if err := lnwallet.ForceStateTransition(h.aliceChannel, h.bobChannel); err != nil { + h.t.Fatalf("unable to force state transition: %v", err) + } + + // Return the revoked commitment (Bob's previous local commitment). + return revokedCommit +} + +// sendSpend sends a spend notification for the given transaction. +func (h *chainWatcherTestHarness) sendSpend(tx *wire.MsgTx) { + h.t.Helper() + + txHash := tx.TxHash() + spend := &chainntnfs.SpendDetail{ + SpenderTxHash: &txHash, + SpendingTx: tx, + SpendingHeight: h.currentHeight, + } + + select { + case h.notifier.SpendChan <- spend: + case <-time.After(time.Second): + h.t.Fatalf("unable to send spend notification") + } +} + +// sendBlockBeat sends a blockbeat to the chain watcher. +// Note: This is not used for cooperative close tests since the chain watcher +// blocks synchronously waiting for confirmations and can't process blockbeats. +func (h *chainWatcherTestHarness) sendBlockBeat() { + h.t.Helper() + + // Create mock blockbeat exactly as the other tests do. + mockBeat := &chainio.MockBlockbeat{} + + // Mock the logger. We don't care how many times it's called as it's + // not critical. + mockBeat.On("logger").Return(log) + + // Mock a fake block height - this is called based on the debuglevel. + mockBeat.On("Height").Return(h.currentHeight).Maybe() + + // Create a channel to signal when blockbeat is processed. + processed := make(chan struct{}) + + // Mock `NotifyBlockProcessed` to signal when done. + mockBeat.On("NotifyBlockProcessed", + nil, h.chainWatcher.quit).Return().Run(func(args mock.Arguments) { + close(processed) + }).Once() + + // Send the blockbeat. + select { + case h.chainWatcher.BlockbeatChan <- mockBeat: + case <-time.After(5 * time.Second): + h.t.Fatalf("unable to send blockbeat") + } + + // Wait for the blockbeat to be processed. + select { + case <-processed: + // Blockbeat processed. + case <-time.After(5 * time.Second): + h.t.Fatalf("blockbeat not processed") + } +} + +// confirmTx sends a confirmation notification for the given transaction. +func (h *chainWatcherTestHarness) confirmTx(tx *wire.MsgTx, height int32) { + h.t.Helper() + + // Find the confirmation event for this transaction. + txHash := tx.TxHash() + var confEvent *mockConfirmationEvent + for _, event := range h.notifier.confEvents { + if event.txid == txHash && !event.cancelled { + confEvent = event + break + } + } + + if confEvent == nil { + // The chain watcher might not have registered for confirmations yet. + // This is not necessarily an error in some test scenarios. + return + } + + // Send confirmation. + select { + case confEvent.confirmedChan <- &chainntnfs.TxConfirmation{ + Tx: tx, + BlockHeight: uint32(height), + }: + case <-time.After(time.Second): + h.t.Fatalf("unable to send confirmation") + } +} + +// triggerReorg sends a negative confirmation (reorg) notification for the +// given transaction with the specified reorg depth. +func (h *chainWatcherTestHarness) triggerReorg(tx *wire.MsgTx, reorgDepth int32) { + h.t.Helper() + + // Find the confirmation event for this transaction. + txHash := tx.TxHash() + var confEvent *mockConfirmationEvent + for _, event := range h.notifier.confEvents { + if event.txid == txHash && !event.cancelled { + confEvent = event + break + } + } + + if confEvent == nil { + // The chain watcher might not have registered for confirmations yet. + // This is not necessarily an error in some test scenarios. + return + } + + // Send negative confirmation. + select { + case confEvent.negConfChan <- reorgDepth: + case <-time.After(time.Second): + h.t.Fatalf("unable to send negative confirmation") + } +} + +// mineBlocks advances the current block height. +func (h *chainWatcherTestHarness) mineBlocks(n int32) { + h.currentHeight += n +} + +// waitForCoopClose waits for a cooperative close event and returns it. +func (h *chainWatcherTestHarness) waitForCoopClose(timeout time.Duration) *CooperativeCloseInfo { + h.t.Helper() + + select { + case coopClose := <-h.chanEvents.CooperativeClosure: + return coopClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive cooperative close event") + return nil + } +} + +// waitForConfRegistration waits for the chain watcher to register for +// confirmation notifications. +func (h *chainWatcherTestHarness) waitForConfRegistration() { + h.t.Helper() + + select { + case <-h.notifier.confRegistered: + // Registration complete. + case <-time.After(2 * time.Second): + // Not necessarily a failure - some tests don't register. + } +} + +// waitForSpendRegistration waits for the chain watcher to register for +// spend notifications. +func (h *chainWatcherTestHarness) waitForSpendRegistration() { + h.t.Helper() + + select { + case <-h.notifier.spendRegistered: + // Registration complete. + case <-time.After(2 * time.Second): + // Not necessarily a failure - some tests don't register. + } +} + +// assertCoopCloseTx asserts that the given cooperative close info matches +// the expected transaction. +func (h *chainWatcherTestHarness) assertCoopCloseTx( + closeInfo *CooperativeCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + if closeInfo.ClosingTXID != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, closeInfo.ClosingTXID) + } +} + +// assertNoCoopClose asserts that no cooperative close event is received +// within the given timeout. +func (h *chainWatcherTestHarness) assertNoCoopClose(timeout time.Duration) { + h.t.Helper() + + select { + case <-h.chanEvents.CooperativeClosure: + h.t.Fatalf("unexpected cooperative close event") + case <-time.After(timeout): + // Expected timeout. + } +} + +// runCoopCloseFlow runs a complete cooperative close flow including spend, +// optional reorg, and confirmation. This helper coordinates the timing +// between the different events. +func (h *chainWatcherTestHarness) runCoopCloseFlow( + tx *wire.MsgTx, shouldReorg bool, reorgDepth int32, + altTx *wire.MsgTx) *CooperativeCloseInfo { + + h.t.Helper() + + // Send initial spend notification. + // This will trigger handleCommitSpend which will detect the coop close + // and call waitForCoopCloseConfirmation (which blocks). + h.sendSpend(tx) + + // Wait for the chain watcher to register for confirmations. + // This happens inside waitForCoopCloseConfirmation. + h.waitForConfRegistration() + + if shouldReorg { + // Trigger reorg to unblock waitForCoopCloseConfirmation. + h.triggerReorg(tx, reorgDepth) + + // If we have an alternative transaction, send it. + if altTx != nil { + // After reorg, the chain watcher should re-register for + // ANY spend of the funding output. + h.waitForSpendRegistration() + + // Send alternative spend. + h.sendSpend(altTx) + + // Wait for it to register for confirmations. + h.waitForConfRegistration() + + // Confirm alternative transaction to unblock. + h.mineBlocks(1) + h.confirmTx(altTx, h.currentHeight) + } + } else { + // Normal confirmation flow - confirm to unblock waitForCoopCloseConfirmation. + h.mineBlocks(1) + h.confirmTx(tx, h.currentHeight) + } + + // Wait for cooperative close event. + return h.waitForCoopClose(5 * time.Second) +} + +// runMultipleReorgFlow simulates multiple consecutive reorganizations with +// different transactions confirming after each reorg. +func (h *chainWatcherTestHarness) runMultipleReorgFlow(txs []*wire.MsgTx, + reorgDepths []int32) *CooperativeCloseInfo { + + h.t.Helper() + + if len(txs) < 2 { + h.t.Fatalf("need at least 2 transactions for reorg flow") + } + if len(reorgDepths) != len(txs)-1 { + h.t.Fatalf("reorg depths must be one less than transactions") + } + + // Send initial spend. + h.sendSpend(txs[0]) + + // Process each reorg. + for i, depth := range reorgDepths { + // Wait for confirmation registration. + h.waitForConfRegistration() + + // Trigger reorg for current transaction. + h.triggerReorg(txs[i], depth) + + // Wait for re-registration for spend. + h.waitForSpendRegistration() + + // Send next transaction. + h.sendSpend(txs[i+1]) + } + + // Wait for final confirmation registration. + h.waitForConfRegistration() + + // Confirm the final transaction. + finalTx := txs[len(txs)-1] + h.mineBlocks(1) + h.confirmTx(finalTx, h.currentHeight) + + // Wait for cooperative close event. + return h.waitForCoopClose(10 * time.Second) +} + +// waitForRemoteUnilateralClose waits for a remote unilateral close event. +func (h *chainWatcherTestHarness) waitForRemoteUnilateralClose( + timeout time.Duration) *RemoteUnilateralCloseInfo { + + h.t.Helper() + + select { + case remoteClose := <-h.chanEvents.RemoteUnilateralClosure: + return remoteClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive remote unilateral close event") + return nil + } +} + +// waitForLocalUnilateralClose waits for a local unilateral close event. +func (h *chainWatcherTestHarness) waitForLocalUnilateralClose( + timeout time.Duration) *LocalUnilateralCloseInfo { + + h.t.Helper() + + select { + case localClose := <-h.chanEvents.LocalUnilateralClosure: + return localClose + case <-time.After(timeout): + h.t.Fatalf("didn't receive local unilateral close event") + return nil + } +} + +// waitForBreach waits for a breach (contract breach) event. +func (h *chainWatcherTestHarness) waitForBreach( + timeout time.Duration) *BreachCloseInfo { + + h.t.Helper() + + select { + case breach := <-h.chanEvents.ContractBreach: + return breach + case <-time.After(timeout): + h.t.Fatalf("didn't receive contract breach event") + return nil + } +} + +// assertRemoteUnilateralCloseTx asserts that the given remote unilateral close +// info matches the expected transaction. +func (h *chainWatcherTestHarness) assertRemoteUnilateralCloseTx( + closeInfo *RemoteUnilateralCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + actualHash := closeInfo.UnilateralCloseSummary.SpendDetail.SpenderTxHash + if *actualHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, *actualHash) + } +} + +// assertLocalUnilateralCloseTx asserts that the given local unilateral close +// info matches the expected transaction. +func (h *chainWatcherTestHarness) assertLocalUnilateralCloseTx( + closeInfo *LocalUnilateralCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + actualHash := closeInfo.LocalForceCloseSummary.CloseTx.TxHash() + if actualHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, actualHash) + } +} + +// assertBreachTx asserts that the given breach info matches the expected +// transaction. +func (h *chainWatcherTestHarness) assertBreachTx( + breachInfo *BreachCloseInfo, expectedTx *wire.MsgTx) { + + h.t.Helper() + + expectedHash := expectedTx.TxHash() + if breachInfo.CommitHash != expectedHash { + h.t.Fatalf("wrong tx confirmed: expected %v, got %v", + expectedHash, breachInfo.CommitHash) + } +} + +// createChannelCapacity returns a channel capacity suitable for testing +// scaled confirmations. +func createChannelCapacity(scale float64) btcutil.Amount { + // Use the maximum channel size as base. + maxSize := btcutil.Amount(16777215) // From lnwallet constants. + return btcutil.Amount(float64(maxSize) * scale) +} \ No newline at end of file diff --git a/itest/list_on_test.go b/itest/list_on_test.go index 92c6547b3af..ec608c629d2 100644 --- a/itest/list_on_test.go +++ b/itest/list_on_test.go @@ -727,6 +727,10 @@ var allTestCases = []*lntest.TestCase{ Name: "rbf coop close disconnect", TestFunc: testRBFCoopCloseDisconnect, }, + { + Name: "coop close rbf with reorg", + TestFunc: testCoopCloseRBFWithReorg, + }, { Name: "bump fee low budget", TestFunc: testBumpFeeLowBudget, diff --git a/itest/lnd_coop_close_rbf_test.go b/itest/lnd_coop_close_rbf_test.go index 5f8b15d4054..57ebe0003fb 100644 --- a/itest/lnd_coop_close_rbf_test.go +++ b/itest/lnd_coop_close_rbf_test.go @@ -1,8 +1,13 @@ package itest import ( + "fmt" + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lntest" + "github.com/lightningnetwork/lnd/lntest/wait" "github.com/lightningnetwork/lnd/lnwallet/chainfee" "github.com/stretchr/testify/require" ) @@ -153,3 +158,173 @@ func testRBFCoopCloseDisconnect(ht *lntest.HarnessTest) { // Disconnect Bob from Alice. ht.DisconnectNodes(alice, bob) } + +// testCoopCloseRBFWithReorg tests that when a cooperative close transaction +// is reorganized out during confirmation waiting, the system properly handles +// RBF replacements and re-registration for any spend of the funding output. +func testCoopCloseRBFWithReorg(ht *lntest.HarnessTest) { + // Skip this test for neutrino backend as we can't trigger reorgs. + if ht.IsNeutrinoBackend() { + ht.Skipf("skipping reorg test for neutrino backend") + } + + // Force cooperative close to require 3 confirmations for predictable + // testing. + const requiredConfs = 3 + rbfCoopFlags := []string{ + "--protocol.rbf-coop-close", + "--dev.force-channel-close-confs=3", + } + + // Set the fee estimate to 1sat/vbyte to ensure our RBF attempts work. + ht.SetFeeEstimate(250) + ht.SetFeeEstimateWithConf(250, 6) + + // Create two nodes with enough coins for a 50/50 channel. + cfgs := [][]string{rbfCoopFlags, rbfCoopFlags} + params := lntest.OpenChannelParams{ + Amt: btcutil.Amount(10_000_000), + PushAmt: btcutil.Amount(5_000_000), + } + chanPoints, nodes := ht.CreateSimpleNetwork(cfgs, params) + alice, bob := nodes[0], nodes[1] + chanPoint := chanPoints[0] + + // Initiate cooperative close with initial fee rate of 5 sat/vb. + initialFeeRate := chainfee.SatPerVByte(5) + _, aliceCloseUpdate := ht.CloseChannelAssertPending( + alice, chanPoint, false, + lntest.WithCoopCloseFeeRate(initialFeeRate), + lntest.WithLocalTxNotify(), + ) + + // Verify the initial close transaction is at the expected fee rate. + alicePendingUpdate := aliceCloseUpdate.GetClosePending() + require.NotNil(ht, aliceCloseUpdate) + require.Equal( + ht, int64(initialFeeRate), alicePendingUpdate.FeePerVbyte, + ) + + // Capture the initial close transaction from the mempool. + initialCloseTxid, err := chainhash.NewHash(alicePendingUpdate.Txid) + require.NoError(ht, err) + initialCloseTx := ht.AssertTxInMempool(*initialCloseTxid) + + // Create first RBF replacement before any mining. + firstRbfFeeRate := chainfee.SatPerVByte(10) + _, firstRbfUpdate := ht.CloseChannelAssertPending( + bob, chanPoint, false, + lntest.WithCoopCloseFeeRate(firstRbfFeeRate), + lntest.WithLocalTxNotify(), + ) + + // Capture the first RBF transaction. + firstRbfTxid, err := chainhash.NewHash(firstRbfUpdate.GetClosePending().Txid) + require.NoError(ht, err) + firstRbfTx := ht.AssertTxInMempool(*firstRbfTxid) + + _, bestHeight, err := ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + + ht.Logf("Current block height: %d", bestHeight) + + // Mine n-1 blocks (2 blocks when requiring 3 confirmations) with the + // first RBF transaction. This is just shy of full confirmation. + block1 := ht.Miner().MineBlockWithTxes( + []*btcutil.Tx{btcutil.NewTx(firstRbfTx)}, + ) + + ht.Logf("Mined block %d with first RBF tx", bestHeight+1) + + block2 := ht.MineEmptyBlocks(1)[0] + + ht.Logf("Mined block %d", bestHeight+2) + + ht.Logf("Re-orging two blocks to remove first RBF tx") + + // Trigger a reorganization that removes the last 2 blocks. This is safe + // because we haven't reached full confirmation yet. + bestBlockHash := block2.Header.BlockHash() + require.NoError( + ht, ht.Miner().Client.InvalidateBlock(&bestBlockHash), + ) + bestBlockHash = block1.Header.BlockHash() + require.NoError( + ht, ht.Miner().Client.InvalidateBlock(&bestBlockHash), + ) + + _, bestHeight, err = ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + ht.Logf("Re-orged to block height: %d", bestHeight) + + ht.Log("Mining blocks to surpass previous chain") + + // Mine 2 empty blocks to trigger the reorg on the nodes. + ht.MineEmptyBlocks(2) + + _, bestHeight, err = ht.Miner().Client.GetBestBlock() + require.NoError(ht, err) + ht.Logf("Mined blocks to reach height: %d", bestHeight) + + // Now, instead of mining the second RBF, mine the INITIAL transaction + // to test that the system can handle any valid spend of the funding + // output. + block := ht.Miner().MineBlockWithTxes( + []*btcutil.Tx{btcutil.NewTx(initialCloseTx)}, + ) + ht.AssertTxInBlock(block, *initialCloseTxid) + + // Mine additional blocks to reach the required confirmations (3 total). + ht.MineEmptyBlocks(requiredConfs - 1) + + // Both parties should see that the channel is now fully closed on chain + // with the expected closing txid. + expectedClosingTxid := initialCloseTxid.String() + err = wait.NoError(func() error { + req := &lnrpc.ClosedChannelsRequest{} + aliceClosedChans := alice.RPC.ClosedChannels(req) + bobClosedChans := bob.RPC.ClosedChannels(req) + if len(aliceClosedChans.Channels) != 1 { + return fmt.Errorf("alice: expected 1 closed chan, got %d", + len(aliceClosedChans.Channels)) + } + if len(bobClosedChans.Channels) != 1 { + return fmt.Errorf("bob: expected 1 closed chan, got %d", + len(bobClosedChans.Channels)) + } + + // Verify both Alice and Bob have the expected closing txid. + aliceClosedChan := aliceClosedChans.Channels[0] + if aliceClosedChan.ClosingTxHash != expectedClosingTxid { + return fmt.Errorf("alice: expected closing txid %s, "+ + "got %s", + expectedClosingTxid, + aliceClosedChan.ClosingTxHash) + } + if aliceClosedChan.CloseType != + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE { + return fmt.Errorf("alice: expected cooperative "+ + "close, got %v", + aliceClosedChan.CloseType) + } + + bobClosedChan := bobClosedChans.Channels[0] + if bobClosedChan.ClosingTxHash != expectedClosingTxid { + return fmt.Errorf("bob: expected closing txid %s, "+ + "got %s", + expectedClosingTxid, + bobClosedChan.ClosingTxHash) + } + if bobClosedChan.CloseType != + lnrpc.ChannelCloseSummary_COOPERATIVE_CLOSE { + return fmt.Errorf("bob: expected cooperative "+ + "close, got %v", + bobClosedChan.CloseType) + } + + return nil + }, defaultTimeout) + require.NoError(ht, err) + + ht.Logf("Successfully verified closing txid: %s", expectedClosingTxid) +} diff --git a/itest/lnd_funding_test.go b/itest/lnd_funding_test.go index b6734e032d8..2c1daf53d2b 100644 --- a/itest/lnd_funding_test.go +++ b/itest/lnd_funding_test.go @@ -1272,8 +1272,17 @@ func testChannelFundingWithUnstableUtxos(ht *lntest.HarnessTest) { // Make sure Carol sees her to_remote output from the force close tx. ht.AssertNumPendingSweeps(carol, 1) - // We need to wait for carol initiating the sweep of the to_remote - // output of chanPoint2. + // Wait for Carol's sweep transaction to appear in the mempool. Due to + // async confirmation notifications, there's a race between when the + // sweep is registered and when the sweeper processes the next block. + // The sweeper uses immediate=false, so it broadcasts on the next block + // after registration. Mine an empty block to trigger the broadcast. + ht.MineEmptyBlocks(1) + + // Now the sweep should be in the mempool. + ht.AssertNumTxsInMempool(1) + + // Now we should see the unconfirmed UTXO from the sweep. utxo := ht.AssertNumUTXOsUnconfirmed(carol, 1)[0] // We now try to open channel using the unconfirmed utxo. @@ -1329,6 +1338,11 @@ func testChannelFundingWithUnstableUtxos(ht *lntest.HarnessTest) { // Make sure Carol sees her to_remote output from the force close tx. ht.AssertNumPendingSweeps(carol, 1) + // Mine an empty block to trigger the sweep broadcast (same fix as + // above). + ht.MineEmptyBlocks(1) + ht.AssertNumTxsInMempool(1) + // Wait for the to_remote sweep tx to show up in carol's wallet. ht.AssertNumUTXOsUnconfirmed(carol, 1) diff --git a/lncfg/dev.go b/lncfg/dev.go index f048d69b7a9..8e0c9dda452 100644 --- a/lncfg/dev.go +++ b/lncfg/dev.go @@ -5,6 +5,7 @@ package lncfg import ( "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwallet/chanfunding" ) @@ -58,3 +59,9 @@ func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { func (d *DevConfig) GetUnsafeConnect() bool { return false } + +// ChannelCloseConfs returns the config value for channel close confirmations +// override, which is always None for production build. +func (d *DevConfig) ChannelCloseConfs() fn.Option[uint32] { + return fn.None[uint32]() +} diff --git a/lncfg/dev_integration.go b/lncfg/dev_integration.go index 8ac85f5d9e9..05ecdb27be0 100644 --- a/lncfg/dev_integration.go +++ b/lncfg/dev_integration.go @@ -5,6 +5,7 @@ package lncfg import ( "time" + "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/lnwallet/chanfunding" ) @@ -27,6 +28,7 @@ type DevConfig struct { UnsafeDisconnect bool `long:"unsafedisconnect" description:"Allows the rpcserver to intentionally disconnect from peers with open channels."` MaxWaitNumBlocksFundingConf uint32 `long:"maxwaitnumblocksfundingconf" description:"Maximum blocks to wait for funding confirmation before discarding non-initiated channels."` UnsafeConnect bool `long:"unsafeconnect" description:"Allow the rpcserver to connect to a peer even if there's already a connection."` + ForceChannelCloseConfs uint32 `long:"force-channel-close-confs" description:"Force a specific number of confirmations for channel closes (dev/test only)"` } // ChannelReadyWait returns the config value `ProcessChannelReadyWait`. @@ -71,3 +73,12 @@ func (d *DevConfig) GetMaxWaitNumBlocksFundingConf() uint32 { func (d *DevConfig) GetUnsafeConnect() bool { return d.UnsafeConnect } + +// ChannelCloseConfs returns the forced confirmation count if set, or None if +// the default behavior should be used. +func (d *DevConfig) ChannelCloseConfs() fn.Option[uint32] { + if d.ForceChannelCloseConfs == 0 { + return fn.None[uint32]() + } + return fn.Some(d.ForceChannelCloseConfs) +} diff --git a/lntest/harness_assertion.go b/lntest/harness_assertion.go index 544576bef97..958278d3cc8 100644 --- a/lntest/harness_assertion.go +++ b/lntest/harness_assertion.go @@ -18,6 +18,7 @@ import ( "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" + "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" @@ -552,8 +553,8 @@ func (h HarnessTest) WaitForChannelCloseEvent( require.NoError(h, err) resp, ok := event.Update.(*lnrpc.CloseStatusUpdate_ChanClose) - require.Truef(h, ok, "expected channel close update, instead got %v", - event.Update) + require.Truef(h, ok, "expected channel close update, instead got %T: %v", + event.Update, spew.Sdump(event.Update)) txid, err := chainhash.NewHash(resp.ChanClose.ClosingTxid) require.NoErrorf(h, err, "wrong format found in closing txid: %v", diff --git a/lntest/mock/chainnotifier.go b/lntest/mock/chainnotifier.go index ddce8defa28..09464ba85cd 100644 --- a/lntest/mock/chainnotifier.go +++ b/lntest/mock/chainnotifier.go @@ -1,6 +1,9 @@ package mock import ( + "testing" + "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/lightningnetwork/lnd/chainntnfs" @@ -8,9 +11,10 @@ import ( // ChainNotifier is a mock implementation of the ChainNotifier interface. type ChainNotifier struct { - SpendChan chan *chainntnfs.SpendDetail - EpochChan chan *chainntnfs.BlockEpoch - ConfChan chan *chainntnfs.TxConfirmation + SpendChan chan *chainntnfs.SpendDetail + EpochChan chan *chainntnfs.BlockEpoch + ConfChan chan *chainntnfs.TxConfirmation + ConfRegistered chan struct{} } // RegisterConfirmationsNtfn returns a ConfirmationEvent that contains a channel @@ -19,6 +23,14 @@ func (c *ChainNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, pkScript []byte, numConfs, heightHint uint32, opts ...chainntnfs.NotifierOption) (*chainntnfs.ConfirmationEvent, error) { + // Signal that a confirmation registration occurred. + if c.ConfRegistered != nil { + select { + case c.ConfRegistered <- struct{}{}: + default: + } + } + return &chainntnfs.ConfirmationEvent{ Confirmed: c.ConfChan, Cancel: func() {}, @@ -61,3 +73,25 @@ func (c *ChainNotifier) Started() bool { func (c *ChainNotifier) Stop() error { return nil } + +// WaitForConfRegistrationAndSend waits for a confirmation registration to occur +// and then sends a confirmation notification. This is a helper function for tests +// that need to ensure the chain watcher has registered for confirmations before +// sending the confirmation. +func (c *ChainNotifier) WaitForConfRegistrationAndSend(t *testing.T) { + t.Helper() + + // Wait for the chain watcher to register for confirmations. + select { + case <-c.ConfRegistered: + case <-time.After(time.Second * 2): + t.Fatalf("timeout waiting for conf registration") + } + + // Send the confirmation to satisfy the confirmation requirement. + select { + case c.ConfChan <- &chainntnfs.TxConfirmation{}: + case <-time.After(time.Second * 1): + t.Fatalf("unable to send confirmation") + } +} diff --git a/lnwallet/confscale.go b/lnwallet/confscale.go new file mode 100644 index 00000000000..8a8239602ff --- /dev/null +++ b/lnwallet/confscale.go @@ -0,0 +1,59 @@ +package lnwallet + +import ( + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnwire" +) + +const ( + // minRequiredConfs is the minimum number of confirmations we'll + // require for channel operations. + minRequiredConfs = 1 + + // maxRequiredConfs is the maximum number of confirmations we'll + // require for channel operations. + maxRequiredConfs = 6 + + // maxChannelSize is the maximum expected channel size in satoshis. + // This matches MaxBtcFundingAmount (0.16777215 BTC). + maxChannelSize = 16777215 +) + +// ScaleNumConfs returns a linearly scaled number of confirmations based on the +// provided channel amount and push amount (for funding transactions). The push +// amount represents additional risk when receiving funds. +func ScaleNumConfs(chanAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi) uint16 { + // For wumbo channels, always require maximum confirmations. + if chanAmt > maxChannelSize { + return maxRequiredConfs + } + + // Calculate total stake: channel amount + push amount. The push amount + // represents value at risk for the receiver. + maxChannelSizeMsat := lnwire.NewMSatFromSatoshis(maxChannelSize) + stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt + + // Scale confirmations linearly based on stake. + conf := uint64(maxRequiredConfs) * uint64(stake) / + uint64(maxChannelSizeMsat) + + // Bound the result between minRequiredConfs and maxRequiredConfs. + if conf < minRequiredConfs { + conf = minRequiredConfs + } + if conf > maxRequiredConfs { + conf = maxRequiredConfs + } + + return uint16(conf) +} + +// FundingConfsForAmounts returns the number of confirmations to wait for a +// funding transaction, taking into account both the channel amount and any +// pushed amount (which represents additional risk). +func FundingConfsForAmounts(chanAmt btcutil.Amount, + pushAmt lnwire.MilliSatoshi) uint16 { + + return ScaleNumConfs(chanAmt, pushAmt) +} + diff --git a/lnwallet/confscale_integration.go b/lnwallet/confscale_integration.go new file mode 100644 index 00000000000..72692a7360a --- /dev/null +++ b/lnwallet/confscale_integration.go @@ -0,0 +1,13 @@ +//go:build integration +// +build integration + +package lnwallet + +import "github.com/btcsuite/btcd/btcutil" + +// CloseConfsForCapacity returns the number of confirmations to wait +// before signaling a cooperative close. Under integration tests, we +// always return 1 to keep tests fast and deterministic. +func CloseConfsForCapacity(capacity btcutil.Amount) uint32 { //nolint:revive + return 1 +} \ No newline at end of file diff --git a/lnwallet/confscale_prod.go b/lnwallet/confscale_prod.go new file mode 100644 index 00000000000..f0a2cd586e8 --- /dev/null +++ b/lnwallet/confscale_prod.go @@ -0,0 +1,25 @@ +//go:build !integration +// +build !integration + +package lnwallet + +import "github.com/btcsuite/btcd/btcutil" + +// CloseConfsForCapacity returns the number of confirmations to wait +// before signaling a cooperative close, scaled by channel capacity. +// We enforce a minimum of 3 confirmations to provide better reorg protection, +// even for small channels. +func CloseConfsForCapacity(capacity btcutil.Amount) uint32 { //nolint:revive + // For cooperative closes, we don't have a push amount to consider, + // so we pass 0 for the pushAmt parameter. + scaledConfs := uint32(ScaleNumConfs(capacity, 0)) + + // Enforce a minimum of 3 confirmations for reorg safety. + // This protects against shallow reorgs which are more common. + const minCoopCloseConfs = 3 + if scaledConfs < minCoopCloseConfs { + return minCoopCloseConfs + } + + return scaledConfs +} \ No newline at end of file diff --git a/lnwallet/confscale_test.go b/lnwallet/confscale_test.go new file mode 100644 index 00000000000..786769e451b --- /dev/null +++ b/lnwallet/confscale_test.go @@ -0,0 +1,338 @@ +package lnwallet + +import ( + "testing" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/stretchr/testify/require" + "pgregory.net/rapid" +) + +// TestScaleNumConfsProperties tests various properties that ScaleNumConfs +// should satisfy using property-based testing. +func TestScaleNumConfsProperties(t *testing.T) { + t.Parallel() + + // The result should always be bounded between the minimum and maximum + // number of confirmations regardless of input values. + t.Run("bounded_result", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate random channel amount and push amount. + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*10, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + result := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + + // Check bounds + require.GreaterOrEqual( + t, result, uint16(minRequiredConfs), + "result should be >= minRequiredConfs", + ) + require.LessOrEqual( + t, result, uint16(maxRequiredConfs), + "result should be <= maxRequiredConfs", + ) + }) + }) + + // Larger channel amounts and push amounts should require equal or more + // confirmations, ensuring the function is monotonically increasing. + t.Run("monotonicity", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate two channel amounts where amt1 <= amt2. + amt1 := rapid.Uint64Range( + 0, maxChannelSize, + ).Draw(t, "amt1") + amt2 := rapid.Uint64Range( + amt1, maxChannelSize, + ).Draw(t, "amt2") + + // Generate push amounts proportional to channel size. + pushAmt1Sats := rapid.Uint64Range( + 0, amt1, + ).Draw(t, "pushAmt1") + pushAmt2Sats := rapid.Uint64Range( + pushAmt1Sats, amt2, + ).Draw(t, "pushAmt2") + + pushAmt1 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt1Sats), + ) + pushAmt2 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt2Sats), + ) + + confs1 := ScaleNumConfs(btcutil.Amount(amt1), pushAmt1) + confs2 := ScaleNumConfs(btcutil.Amount(amt2), pushAmt2) + + // Larger or equal stake should require equal or more + // confirmations. + require.GreaterOrEqual( + t, confs2, confs1, + "larger amount should require equal or "+ + "more confirmations", + ) + }) + }) + + // Wumbo channels (those exceeding the max standard channel size) should + // always require the maximum number of confirmations for safety. + t.Run("wumbo_max_confs", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Generate wumbo channel amount (above maxChannelSize). + wumboAmt := rapid.Uint64Range( + maxChannelSize+1, maxChannelSize*100, + ).Draw(t, "wumboAmt") + pushAmtSats := rapid.Uint64Range( + 0, wumboAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + result := ScaleNumConfs( + btcutil.Amount(wumboAmt), pushAmt, + ) + + require.Equal( + t, uint16(maxRequiredConfs), result, + "wumbo channels should always get "+ + "max confirmations", + ) + }) + }) + + // Zero channel amounts should always result in the minimum number of + // confirmations since there's no value at risk. + t.Run("zero_gets_min", func(t *testing.T) { + result := ScaleNumConfs(0, 0) + require.Equal( + t, uint16(minRequiredConfs), result, + "zero amount should get minimum confirmations", + ) + }) + + // The function should be deterministic, always returning the same + // output for the same input values. + t.Run("determinism", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + // Call multiple times with same inputs. + result1 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + result2 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + result3 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt, + ) + + require.Equal( + t, result1, result2, + "function should be deterministic", + ) + require.Equal( + t, result2, result3, + "function should be deterministic", + ) + }) + }) + + // Adding a push amount to a channel should require equal or more + // confirmations compared to the same channel without a push amount. + t.Run("push_amount_effect", func(t *testing.T) { + rapid.Check(t, func(t *rapid.T) { + // Fix channel amount, vary push amount + chanAmt := rapid.Uint64Range( + 1, maxChannelSize, + ).Draw(t, "chanAmt") + pushAmt1Sats := rapid.Uint64Range( + 0, chanAmt/2, + ).Draw(t, "pushAmt1") + pushAmt2Sats := rapid.Uint64Range( + pushAmt1Sats, chanAmt, + ).Draw(t, "pushAmt2") + + pushAmt1 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt1Sats), + ) + pushAmt2 := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmt2Sats), + ) + + confs1 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt1, + ) + confs2 := ScaleNumConfs( + btcutil.Amount(chanAmt), pushAmt2, + ) + + // More push amount should require equal or more + // confirmations. + require.GreaterOrEqual( + t, confs2, confs1, + "larger push amount should "+ + "require equal or more confirmations", + ) + }) + }) +} + +// TestScaleNumConfsKnownValues tests ScaleNumConfs with specific known values +// to ensure the scaling formula works as expected. +func TestScaleNumConfsKnownValues(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + chanAmt btcutil.Amount + pushAmt lnwire.MilliSatoshi + expected uint16 + }{ + { + name: "zero amounts", + chanAmt: 0, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "tiny channel", + chanAmt: 1000, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "small channel no push", + chanAmt: 100_000, + pushAmt: 0, + expected: minRequiredConfs, + }, + { + name: "half max channel no push", + chanAmt: maxChannelSize / 2, + pushAmt: 0, + expected: 2, + }, + { + name: "max channel no push", + chanAmt: maxChannelSize, + pushAmt: 0, + expected: maxRequiredConfs, + }, + { + name: "wumbo channel", + chanAmt: maxChannelSize * 2, + pushAmt: 0, + expected: maxRequiredConfs, + }, + { + name: "small channel with push", + chanAmt: 100_000, + pushAmt: lnwire.NewMSatFromSatoshis(50_000), + expected: minRequiredConfs, + }, + { + name: "medium channel with significant push", + chanAmt: maxChannelSize / 4, + pushAmt: lnwire.NewMSatFromSatoshis(maxChannelSize / 4), + expected: 2, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + result := ScaleNumConfs(tc.chanAmt, tc.pushAmt) + + require.Equal( + t, tc.expected, result, + "chanAmt=%d, pushAmt=%d", tc.chanAmt, + tc.pushAmt, + ) + }) + } +} + +// TestFundingConfsForAmounts verifies that FundingConfsForAmounts is a simple +// wrapper around ScaleNumConfs. +func TestFundingConfsForAmounts(t *testing.T) { + t.Parallel() + + rapid.Check(t, func(t *rapid.T) { + chanAmt := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "chanAmt") + pushAmtSats := rapid.Uint64Range( + 0, chanAmt, + ).Draw(t, "pushAmtSats") + pushAmt := lnwire.NewMSatFromSatoshis( + btcutil.Amount(pushAmtSats), + ) + + // Both functions should return the same result. + scaleResult := ScaleNumConfs(btcutil.Amount(chanAmt), pushAmt) + fundingResult := FundingConfsForAmounts( + btcutil.Amount(chanAmt), pushAmt, + ) + + require.Equal( + t, scaleResult, fundingResult, + "FundingConfsForAmounts should return "+ + "same result as ScaleNumConfs", + ) + }) +} + +// TestCloseConfsForCapacity verifies that CloseConfsForCapacity correctly +// wraps ScaleNumConfs with zero push amount and enforces a minimum of 3 +// confirmations for reorg safety. +func TestCloseConfsForCapacity(t *testing.T) { + t.Parallel() + + rapid.Check(t, func(t *rapid.T) { + capacity := rapid.Uint64Range( + 0, maxChannelSize*2, + ).Draw(t, "capacity") + + // CloseConfsForCapacity should be equivalent to ScaleNumConfs + // with 0 push, but with a minimum of 3 confirmations enforced + // for reorg safety. + closeConfs := CloseConfsForCapacity(btcutil.Amount(capacity)) + scaleConfs := ScaleNumConfs(btcutil.Amount(capacity), 0) + + // The result should be at least the scaled value, but with a + // minimum of 3 confirmations. + const minCoopCloseConfs = 3 + expectedConfs := uint32(scaleConfs) + if expectedConfs < minCoopCloseConfs { + expectedConfs = minCoopCloseConfs + } + + require.Equal( + t, expectedConfs, closeConfs, + "CloseConfsForCapacity should match "+ + "ScaleNumConfs with 0 push amount, "+ + "but with minimum of 3 confs", + ) + }) +} diff --git a/peer/brontide.go b/peer/brontide.go index 9191cbb2eeb..a1aa1a2c200 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -370,6 +370,12 @@ type Config struct { // closure initiated by the remote peer. CoopCloseTargetConfs uint32 + // ChannelCloseConfs is an optional override for the number of + // confirmations required for channel closes. When set, this overrides + // the normal capacity-based scaling. This is only available in + // dev/integration builds for testing purposes. + ChannelCloseConfs fn.Option[uint32] + // ServerPubKey is the serialized, compressed public key of our lnd node. // It is used to determine which policy (channel edge) to pass to the // ChannelLink. @@ -4436,17 +4442,30 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // If this is a locally requested shutdown, update the caller with a // new event detailing the current pending state of this request. if closeReq != nil { + // TODO(roasbeef): don't actually need this? closeReq.Updates <- &PendingUpdate{ - Txid: closingTxid[:], + Txid: closingTxid[:], + IsLocalCloseTx: fn.Some(true), } } localOut := chanCloser.LocalCloseOutput() remoteOut := chanCloser.RemoteCloseOutput() auxOut := chanCloser.AuxOutputs() + + // Determine the number of confirmations to wait before signaling a + // successful cooperative close, scaled by channel capacity (see + // CloseConfsForCapacity). Check if we have a config override for + // testing purposes. + numConfs := p.cfg.ChannelCloseConfs.UnwrapOrFunc(func() uint32 { + // No override, use normal capacity-based scaling. + return lnwallet.CloseConfsForCapacity(chanCloser.Channel().Capacity) + }) + + // Register for full confirmation to send the final update. go WaitForChanToClose( chanCloser.NegotiationHeight(), notifier, errChan, - &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() { + &chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, numConfs, func() { // Respond to the local subsystem which requested the // channel closure. if closeReq != nil { @@ -4469,14 +4488,13 @@ func (p *Brontide) finalizeChanClosure(chanCloser *chancloser.ChanCloser) { // the function, then it will be sent over the errChan. func WaitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier, errChan chan error, chanPoint *wire.OutPoint, - closingTxID *chainhash.Hash, closeScript []byte, cb func()) { + closingTxID *chainhash.Hash, closeScript []byte, numConfs uint32, cb func()) { peerLog.Infof("Waiting for confirmation of close of ChannelPoint(%v) "+ "with txid: %v", chanPoint, closingTxID) - // TODO(roasbeef): add param for num needed confs confNtfn, err := notifier.RegisterConfirmationsNtfn( - closingTxID, closeScript, 1, bestHeight, + closingTxID, closeScript, numConfs, bestHeight, ) if err != nil { if errChan != nil { diff --git a/rpcserver.go b/rpcserver.go index d3d3c518014..83330e30819 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -2800,14 +2800,14 @@ func (r *rpcServer) CloseChannel(in *lnrpc.CloseChannelRequest, errChan = make(chan error, 1) notifier := r.server.cc.ChainNotifier - go peer.WaitForChanToClose( - uint32(bestHeight), notifier, errChan, chanPoint, - &closingTxid, closingTx.TxOut[0].PkScript, func() { - // Respond to the local subsystem which - // requested the channel closure. - updateChan <- &peer.ChannelCloseUpdate{ - ClosingTxid: closingTxid[:], - Success: true, + go peer.WaitForChanToClose( + uint32(bestHeight), notifier, errChan, chanPoint, + &closingTxid, closingTx.TxOut[0].PkScript, 1, func() { + // Respond to the local subsystem which + // requested the channel closure. + updateChan <- &peer.ChannelCloseUpdate{ + ClosingTxid: closingTxid[:], + Success: true, // Force closure transactions don't have // additional local/remote outputs. } diff --git a/server.go b/server.go index 6d0b28f7ee1..3bbae4f765f 100644 --- a/server.go +++ b/server.go @@ -1361,9 +1361,10 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, return &pc.Incoming }, - AuxLeafStore: implCfg.AuxLeafStore, - AuxSigner: implCfg.AuxSigner, - AuxResolver: implCfg.AuxContractResolver, + AuxLeafStore: implCfg.AuxLeafStore, + AuxSigner: implCfg.AuxSigner, + AuxResolver: implCfg.AuxContractResolver, + ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), }, dbs.ChanStateDB) // Select the configuration and funding parameters for Bitcoin. @@ -1468,16 +1469,6 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, DefaultMinHtlcIn: cc.MinHtlcIn, NumRequiredConfs: func(chanAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi) uint16 { - // For large channels we increase the number - // of confirmations we require for the - // channel to be considered open. As it is - // always the responder that gets to choose - // value, the pushAmt is value being pushed - // to us. This means we have more to lose - // in the case this gets re-orged out, and - // we will require more confirmations before - // we consider it open. - // In case the user has explicitly specified // a default value for the number of // confirmations, we use it. @@ -1486,29 +1477,17 @@ func newServer(ctx context.Context, cfg *Config, listenAddrs []net.Addr, return defaultConf } - minConf := uint64(3) - maxConf := uint64(6) - - // If this is a wumbo channel, then we'll require the - // max amount of confirmations. - if chanAmt > MaxFundingAmount { - return uint16(maxConf) - } - - // If not we return a value scaled linearly - // between 3 and 6, depending on channel size. - // TODO(halseth): Use 1 as minimum? - maxChannelSize := uint64( - lnwire.NewMSatFromSatoshis(MaxFundingAmount)) - stake := lnwire.NewMSatFromSatoshis(chanAmt) + pushAmt - conf := maxConf * uint64(stake) / maxChannelSize - if conf < minConf { - conf = minConf - } - if conf > maxConf { - conf = maxConf - } - return uint16(conf) + // Otherwise, scale the number of confirmations based on + // the channel amount and push amount. For large + // channels we increase the number of + // confirmations we require for the channel to be + // considered open. As it is always the + // responder that gets to choose value, the + // pushAmt is value being pushed to us. This + // means we have more to lose in the case this + // gets re-orged out, and we will require more + // confirmations before we consider it open. + return lnwallet.FundingConfsForAmounts(chanAmt, pushAmt) }, RequiredRemoteDelay: func(chanAmt btcutil.Amount) uint16 { // We scale the remote CSV delay (the time the @@ -4384,6 +4363,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, MaxOutgoingCltvExpiry: s.cfg.MaxOutgoingCltvExpiry, MaxChannelFeeAllocation: s.cfg.MaxChannelFeeAllocation, CoopCloseTargetConfs: s.cfg.CoopCloseTargetConfs, + ChannelCloseConfs: s.cfg.Dev.ChannelCloseConfs(), MaxAnchorsCommitFeeRate: chainfee.SatPerKVByte( s.cfg.MaxCommitFeeRateAnchors * 1000).FeePerKWeight(), ChannelCommitInterval: s.cfg.ChannelCommitInterval,