From 939c9b4ccf856d0bcdddabfe4eff78130daf697f Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 24 May 2024 16:20:50 +0200 Subject: [PATCH 1/4] loopdb+sweepbatcher: add the DropBatch call --- loopdb/sqlc/batch.sql.go | 9 +++++++++ loopdb/sqlc/querier.go | 1 + loopdb/sqlc/queries/batch.sql | 3 +++ sweepbatcher/store.go | 22 ++++++++++++++++++++++ sweepbatcher/store_mock.go | 6 ++++++ 5 files changed, 41 insertions(+) diff --git a/loopdb/sqlc/batch.sql.go b/loopdb/sqlc/batch.sql.go index dae2ad17a..c8fab1f28 100644 --- a/loopdb/sqlc/batch.sql.go +++ b/loopdb/sqlc/batch.sql.go @@ -25,6 +25,15 @@ func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error { return err } +const dropBatch = `-- name: DropBatch :exec +DELETE FROM sweep_batches WHERE id = $1 +` + +func (q *Queries) DropBatch(ctx context.Context, id int32) error { + _, err := q.db.ExecContext(ctx, dropBatch, id) + return err +} + const getBatchSweeps = `-- name: GetBatchSweeps :many SELECT sweeps.id, sweeps.swap_hash, sweeps.batch_id, sweeps.outpoint_txid, sweeps.outpoint_index, sweeps.amt, sweeps.completed, diff --git a/loopdb/sqlc/querier.go b/loopdb/sqlc/querier.go index b1f0903c7..d12c6ed77 100644 --- a/loopdb/sqlc/querier.go +++ b/loopdb/sqlc/querier.go @@ -11,6 +11,7 @@ import ( type Querier interface { ConfirmBatch(ctx context.Context, id int32) error CreateReservation(ctx context.Context, arg CreateReservationParams) error + DropBatch(ctx context.Context, id int32) error FetchLiquidityParams(ctx context.Context) ([]byte, error) GetBatchSweeps(ctx context.Context, batchID int32) ([]GetBatchSweepsRow, error) GetBatchSweptAmount(ctx context.Context, batchID int32) (int64, error) diff --git a/loopdb/sqlc/queries/batch.sql b/loopdb/sqlc/queries/batch.sql index a9793f902..5951dfe46 100644 --- a/loopdb/sqlc/queries/batch.sql +++ b/loopdb/sqlc/queries/batch.sql @@ -23,6 +23,9 @@ INSERT INTO sweep_batches ( $6 ) RETURNING id; +-- name: DropBatch :exec +DELETE FROM sweep_batches WHERE id = $1; + -- name: UpdateBatch :exec UPDATE sweep_batches SET confirmed = $2, diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go index 3fcc32e73..bdbcb50cc 100644 --- a/sweepbatcher/store.go +++ b/sweepbatcher/store.go @@ -3,6 +3,7 @@ package sweepbatcher import ( "context" "database/sql" + "fmt" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" @@ -46,6 +47,9 @@ type BaseDB interface { InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) ( int32, error) + // DropBatch drops a batch from the database. + DropBatch(ctx context.Context, id int32) error + // UpdateBatch updates a batch in the database. UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error @@ -108,6 +112,24 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch)) } +// DropBatch drops a batch from the database. Note that we only use this call +// for batches that have no sweeps and so we'd not be able to resume. +func (s *SQLStore) DropBatch(ctx context.Context, id int32) error { + readOpts := loopdb.NewSqlReadOpts() + return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error { + dbSweeps, err := tx.GetBatchSweeps(ctx, id) + if err != nil { + return err + } + + if len(dbSweeps) != 0 { + return fmt.Errorf("cannot drop a non-empty batch") + } + + return tx.DropBatch(ctx, id) + }) +} + // UpdateSweepBatch updates a batch in the database. func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error { return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch)) diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go index f27fcb4d9..053803415 100644 --- a/sweepbatcher/store_mock.go +++ b/sweepbatcher/store_mock.go @@ -56,6 +56,12 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context, return id, nil } +// DropBatch drops a batch from the database. +func (s *StoreMock) DropBatch(ctx context.Context, id int32) error { + delete(s.batches, id) + return nil +} + // UpdateSweepBatch updates a batch in the database. func (s *StoreMock) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error { From c01e8014e171a01df3a84b05e9789c023bd69b0d Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 24 May 2024 10:36:42 +0200 Subject: [PATCH 2/4] sweepbatcher: do not fail on restoring empty batches Previously storing an empty batch would make the batcher fail to start as spinning up a restored batch assumes that there's a primary sweep added already. As there's no point in spinning up such batch we can just skip over it. Furthermore we'll ensure that we won't try to ever publish an empty batch to avoid setting the fee rate too early. --- sweepbatcher/sweep_batch.go | 9 ++++- sweepbatcher/sweep_batcher.go | 62 ++++++++++++++++++++--------------- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 572c5da45..05561b68e 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "errors" "fmt" "math" "sync" @@ -48,7 +49,7 @@ const ( ) var ( - ErrBatchShuttingDown = fmt.Errorf("batch shutting down") + ErrBatchShuttingDown = errors.New("batch shutting down") ) // sweep stores any data related to sweeping a specific outpoint. @@ -539,6 +540,12 @@ func (b *batch) publish(ctx context.Context) error { coopSuccess bool ) + if len(b.sweeps) == 0 { + b.log.Debugf("skipping publish: no sweeps in the batch") + + return nil + } + // Run the RBF rate update. err = b.updateRbfRate(ctx) if err != nil { diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 7065a4717..a0b254a4b 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -2,6 +2,7 @@ package sweepbatcher import ( "context" + "errors" "fmt" "sync" "time" @@ -46,36 +47,35 @@ const ( type BatcherStore interface { // FetchUnconfirmedSweepBatches fetches all the batches from the // database that are not in a confirmed state. - FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, - error) + FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error) // InsertSweepBatch inserts a batch into the database, returning the id // of the inserted batch. - InsertSweepBatch(ctx context.Context, - batch *dbBatch) (int32, error) + InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error) + + // DropBatch drops a batch from the database. This should only be used + // when a batch is empty. + DropBatch(ctx context.Context, id int32) error // UpdateSweepBatch updates a batch in the database. - UpdateSweepBatch(ctx context.Context, - batch *dbBatch) error + UpdateSweepBatch(ctx context.Context, batch *dbBatch) error // ConfirmBatch confirms a batch by setting its state to confirmed. ConfirmBatch(ctx context.Context, id int32) error // FetchBatchSweeps fetches all the sweeps that belong to a batch. - FetchBatchSweeps(ctx context.Context, - id int32) ([]*dbSweep, error) + FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error) // UpsertSweep inserts a sweep into the database, or updates an existing // sweep if it already exists. UpsertSweep(ctx context.Context, sweep *dbSweep) error // GetSweepStatus returns the completed status of the sweep. - GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) ( - bool, error) + GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error) // GetParentBatch returns the parent batch of a (completed) sweep. - GetParentBatch(ctx context.Context, swapHash lntypes.Hash) ( - *dbBatch, error) + GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch, + error) // TotalSweptAmount returns the total amount swept by a (confirmed) // batch. @@ -135,7 +135,7 @@ type SpendNotifier struct { } var ( - ErrBatcherShuttingDown = fmt.Errorf("batcher shutting down") + ErrBatcherShuttingDown = errors.New("batcher shutting down") ) // Batcher is a system that is responsible for accepting sweep requests and @@ -306,7 +306,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, if batch.sweepExists(sweep.swapHash) { accepted, err := batch.addSweep(ctx, sweep) - if err != nil { + if err != nil && !errors.Is(err, ErrBatchShuttingDown) { return err } @@ -321,7 +321,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep, // If one of the batches accepts the sweep, we provide it to that batch. for _, batch := range b.batches { accepted, err := batch.addSweep(ctx, sweep) - if err != nil && err != ErrBatchShuttingDown { + if err != nil && !errors.Is(err, ErrBatchShuttingDown) { return err } @@ -407,23 +407,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { // spinUpBatchDB spins up a batch that already existed in storage, then // returns it. func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { - cfg := batchConfig{ - maxTimeoutDistance: batch.cfg.maxTimeoutDistance, - batchConfTarget: defaultBatchConfTarget, - } - - rbfCache := rbfCache{ - LastHeight: batch.rbfCache.LastHeight, - FeeRate: batch.rbfCache.FeeRate, - } - dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id) if err != nil { return err } if len(dbSweeps) == 0 { - return fmt.Errorf("batch %d has no sweeps", batch.id) + log.Infof("skipping restored batch %d as it has no sweeps", + batch.id) + + // It is safe to drop this empty batch as it has no sweeps. + err := b.store.DropBatch(ctx, batch.id) + if err != nil { + log.Warnf("unable to drop empty batch %d: %v", + batch.id, err) + } + + return nil } primarySweep := dbSweeps[0] @@ -439,6 +439,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { sweeps[sweep.swapHash] = *sweep } + rbfCache := rbfCache{ + LastHeight: batch.rbfCache.LastHeight, + FeeRate: batch.rbfCache.FeeRate, + } + batchKit := batchKit{ id: batch.id, batchTxid: batch.batchTxid, @@ -458,6 +463,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { log: batchPrefixLogger(fmt.Sprintf("%d", batch.id)), } + cfg := batchConfig{ + maxTimeoutDistance: batch.cfg.maxTimeoutDistance, + batchConfTarget: defaultBatchConfTarget, + } + newBatch := NewBatchFromDB(cfg, batchKit) // We add the batch to our map of batches and start it. From e5ade6a0b155dd99113bba924ec587c5b12fc1f4 Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 24 May 2024 10:52:26 +0200 Subject: [PATCH 3/4] sweepbatcher: close the quit channel when the batcher is shutting down --- sweepbatcher/sweep_batch.go | 18 +++++++++++++---- sweepbatcher/sweep_batcher.go | 3 +++ sweepbatcher/sweep_batcher_test.go | 31 +++++++++++++++--------------- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 05561b68e..b76ed3aa7 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -197,7 +197,11 @@ type batch struct { // main event loop. callLeave chan struct{} - // quit signals that the batch must stop. + // stopped signals that the batch has stopped. + stopped chan struct{} + + // quit is owned by the parent batcher and signals that the batch must + // stop. quit chan struct{} // wallet is the wallet client used to create and publish the batch @@ -261,6 +265,7 @@ type batchKit struct { purger Purger store BatcherStore log btclog.Logger + quit chan struct{} } // scheduleNextCall schedules the next call to the batch handler's main event @@ -270,6 +275,9 @@ func (b *batch) scheduleNextCall() (func(), error) { case b.callEnter <- struct{}{}: case <-b.quit: + return func() {}, ErrBatcherShuttingDown + + case <-b.stopped: return func() {}, ErrBatchShuttingDown } @@ -293,7 +301,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch { errChan: make(chan error, 1), callEnter: make(chan struct{}), callLeave: make(chan struct{}), - quit: make(chan struct{}), + stopped: make(chan struct{}), + quit: bk.quit, batchTxid: bk.batchTxid, wallet: bk.wallet, chainNotifier: bk.chainNotifier, @@ -320,7 +329,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch { errChan: make(chan error, 1), callEnter: make(chan struct{}), callLeave: make(chan struct{}), - quit: make(chan struct{}), + stopped: make(chan struct{}), + quit: bk.quit, batchTxid: bk.batchTxid, batchPkScript: bk.batchPkScript, rbfCache: bk.rbfCache, @@ -447,7 +457,7 @@ func (b *batch) Run(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer func() { cancel() - close(b.quit) + close(b.stopped) b.wg.Wait() }() diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index a0b254a4b..8816cd16f 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -216,6 +216,7 @@ func (b *Batcher) Run(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer func() { cancel() + close(b.quit) for _, batch := range b.batches { batch.Wait() @@ -379,6 +380,7 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) { verifySchnorrSig: b.VerifySchnorrSig, purger: b.AddSweep, store: b.store, + quit: b.quit, } batch := NewBatch(cfg, batchKit) @@ -461,6 +463,7 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error { purger: b.AddSweep, store: b.store, log: batchPrefixLogger(fmt.Sprintf("%d", batch.id)), + quit: b.quit, } cfg := batchConfig{ diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index fa56ff60f..4ddbb6569 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -2,7 +2,7 @@ package sweepbatcher import ( "context" - "strings" + "errors" "testing" "time" @@ -43,6 +43,15 @@ var dummyNotifier = SpendNotifier{ QuitChan: make(chan bool, ntfnBufferSize), } +func checkBatcherError(t *testing.T, err error) { + if !errors.Is(err, context.Canceled) && + !errors.Is(err, ErrBatcherShuttingDown) && + !errors.Is(err, ErrBatchShuttingDown) { + + require.NoError(t, err) + } +} + // TestSweepBatcherBatchCreation tests that sweep requests enter the expected // batch based on their timeout distance. func TestSweepBatcherBatchCreation(t *testing.T) { @@ -60,9 +69,7 @@ func TestSweepBatcherBatchCreation(t *testing.T) { testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) go func() { err := batcher.Run(ctx) - if !strings.Contains(err.Error(), "context canceled") { - require.NoError(t, err) - } + checkBatcherError(t, err) }() // Create a sweep request. @@ -215,9 +222,7 @@ func TestSweepBatcherSimpleLifecycle(t *testing.T) { testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) go func() { err := batcher.Run(ctx) - if !strings.Contains(err.Error(), "context canceled") { - require.NoError(t, err) - } + checkBatcherError(t, err) }() // Create a sweep request. @@ -354,9 +359,7 @@ func TestSweepBatcherSweepReentry(t *testing.T) { testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) go func() { err := batcher.Run(ctx) - if !strings.Contains(err.Error(), "context canceled") { - require.NoError(t, err) - } + checkBatcherError(t, err) }() // Create some sweep requests with timeouts not too far away, in order @@ -561,9 +564,7 @@ func TestSweepBatcherNonWalletAddr(t *testing.T) { testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) go func() { err := batcher.Run(ctx) - if !strings.Contains(err.Error(), "context canceled") { - require.NoError(t, err) - } + checkBatcherError(t, err) }() // Create a sweep request. @@ -727,9 +728,7 @@ func TestSweepBatcherComposite(t *testing.T) { testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) go func() { err := batcher.Run(ctx) - if !strings.Contains(err.Error(), "context canceled") { - require.NoError(t, err) - } + checkBatcherError(t, err) }() // Create a sweep request. From 14de8f1f5d891190d3947cc557707b8e4cbdd6da Mon Sep 17 00:00:00 2001 From: Andras Banki-Horvath Date: Fri, 24 May 2024 10:52:54 +0200 Subject: [PATCH 4/4] sweepbatcher: test that empty batches won't prevent startup --- sweepbatcher/sweep_batcher.go | 8 +++ sweepbatcher/sweep_batcher_test.go | 83 ++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index 8816cd16f..74fa9d259 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -153,6 +153,10 @@ type Batcher struct { // quit signals that the batch must stop. quit chan struct{} + // initDone is a channel that is closed when the batcher has been + // initialized. + initDone chan struct{} + // wallet is the wallet kit client that is used by batches. wallet lndclient.WalletKitClient @@ -200,6 +204,7 @@ func NewBatcher(wallet lndclient.WalletKitClient, sweepReqs: make(chan SweepRequest), errChan: make(chan error, 1), quit: make(chan struct{}), + initDone: make(chan struct{}), wallet: wallet, chainNotifier: chainNotifier, signerClient: signerClient, @@ -239,6 +244,9 @@ func (b *Batcher) Run(ctx context.Context) error { } } + // Signal that the batcher has been initialized. + close(b.initDone) + for { select { case sweepReq := <-b.sweepReqs: diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index 4ddbb6569..92067b03e 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -3,6 +3,7 @@ package sweepbatcher import ( "context" "errors" + "sync" "testing" "time" @@ -1028,3 +1029,85 @@ func TestGetFeePortionForSweep(t *testing.T) { }) } } + +// TestRestoringEmptyBatch tests that the batcher can be restored with an empty +// batch. +func TestRestoringEmptyBatch(t *testing.T) { + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + + store := loopdb.NewStoreMock(t) + + batcherStore := NewStoreMock() + _, err := batcherStore.InsertSweepBatch(ctx, &dbBatch{}) + require.NoError(t, err) + + batcher := NewBatcher(lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, nil, lnd.ChainParams, batcherStore, store) + + var wg sync.WaitGroup + wg.Add(1) + + var runErr error + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + // Wait for the batcher to be initialized. + <-batcher.initDone + + // Create a sweep request. + sweepReq := SweepRequest{ + SwapHash: lntypes.Hash{1, 1, 1}, + Value: 111, + Outpoint: wire.OutPoint{ + Hash: chainhash.Hash{1, 1}, + Index: 1, + }, + Notifier: &dummyNotifier, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 111, + AmountRequested: 111, + }, + + SwapInvoice: swapInvoice, + } + + err = store.CreateLoopOut(ctx, sweepReq.SwapHash, swap) + require.NoError(t, err) + store.AssertLoopOutStored() + + // Deliver sweep request to batcher. + batcher.sweepReqs <- sweepReq + + // Since a batch was created we check that it registered for its primary + // sweep's spend. + <-lnd.RegisterSpendChannel + + // Once batcher receives sweep request it will eventually spin up a + // batch. + require.Eventually(t, func() bool { + // Make sure that the sweep was stored and we have exactly one + // active batch. + return batcherStore.AssertSweepStored(sweepReq.SwapHash) && + len(batcher.batches) == 1 + }, test.Timeout, eventuallyCheckFrequency) + + // Make sure we have only one batch stored (as we dropped the dormant + // one). + batches, err := batcherStore.FetchUnconfirmedSweepBatches(ctx) + require.NoError(t, err) + require.Len(t, batches, 1) + + // Now make it quit by canceling the context. + cancel() + wg.Wait() + + checkBatcherError(t, runErr) +}