Skip to content

Commit 1beb6b9

Browse files
committed
sweepbatcher: persist confirmed batches atomically
Fixes a crash window where handleConf updated the batch row to confirmed but failed before marking sweeps complete, so re-added sweeps spawned a duplicate batch or kept retrying. Batch confirmation and sweep completion are now persisted inside a single DB transaction ConfirmBatchWithSweeps, and handleConf uses the helper to atomically store the batch and the set of confirmed sweeps. Added TestSweepBatcherConfirmedBatchIncompleteSweeps that runs against the real loopdb backend, injects a failure mid-transaction, and verifies the database never ends up with confirmed=true batches paired with completed=false sweeps.
1 parent de969fd commit 1beb6b9

File tree

5 files changed

+347
-20
lines changed

5 files changed

+347
-20
lines changed

sweepbatcher/store.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sweepbatcher
33
import (
44
"context"
55
"database/sql"
6+
"fmt"
67

78
"github.com/btcsuite/btcd/btcutil"
89
"github.com/btcsuite/btcd/chaincfg"
@@ -121,6 +122,30 @@ func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
121122
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))
122123
}
123124

125+
// ConfirmBatchWithSweeps atomically confirms the batch and updates its sweeps.
126+
func (s *SQLStore) ConfirmBatchWithSweeps(ctx context.Context, batch *dbBatch,
127+
sweeps []*dbSweep) error {
128+
129+
writeOpts := loopdb.NewSqlWriteOpts()
130+
131+
return s.baseDb.ExecTx(ctx, writeOpts, func(tx Querier) error {
132+
err := tx.UpdateBatch(ctx, batchToUpdateArgs(*batch))
133+
if err != nil {
134+
return fmt.Errorf("update batch %d: %w", batch.ID, err)
135+
}
136+
137+
for _, sweep := range sweeps {
138+
err := tx.UpsertSweep(ctx, sweepToUpsertArgs(*sweep))
139+
if err != nil {
140+
return fmt.Errorf("upsert sweep %v: %w",
141+
sweep.Outpoint, err)
142+
}
143+
}
144+
145+
return nil
146+
})
147+
}
148+
124149
// FetchBatchSweeps fetches all the sweeps that are part a batch.
125150
func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) (
126151
[]*dbSweep, error) {

sweepbatcher/store_mock.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sweepbatcher
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"sort"
78
"sync"
89

@@ -77,6 +78,32 @@ func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
7778
return nil
7879
}
7980

81+
// ConfirmBatchWithSweeps updates the batch and the provided sweeps atomically.
82+
func (s *StoreMock) ConfirmBatchWithSweeps(ctx context.Context,
83+
batch *dbBatch, sweeps []*dbSweep) error {
84+
85+
s.mu.Lock()
86+
defer s.mu.Unlock()
87+
88+
s.batches[batch.ID] = *batch
89+
90+
for _, sweep := range sweeps {
91+
sweepCopy := *sweep
92+
93+
old, exists := s.sweeps[sweep.Outpoint]
94+
if !exists {
95+
return fmt.Errorf("confirming unknown sweep %v",
96+
sweep.Outpoint)
97+
}
98+
99+
sweepCopy.ID = old.ID
100+
101+
s.sweeps[sweep.Outpoint] = sweepCopy
102+
}
103+
104+
return nil
105+
}
106+
80107
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
81108
func (s *StoreMock) FetchBatchSweeps(ctx context.Context,
82109
id int32) ([]*dbSweep, error) {

sweepbatcher/sweep_batch.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2224,10 +2224,6 @@ func (b *batch) handleConf(ctx context.Context,
22242224
b.Infof("confirmed in txid %s", b.batchTxid)
22252225
b.state = Confirmed
22262226

2227-
if err := b.persist(ctx); err != nil {
2228-
return fmt.Errorf("saving batch failed: %w", err)
2229-
}
2230-
22312227
// If the batch is in presigned mode, cleanup presignedHelper.
22322228
presigned, err := b.isPresigned()
22332229
if err != nil {
@@ -2261,18 +2257,16 @@ func (b *batch) handleConf(ctx context.Context,
22612257
confirmedSweeps = []wire.OutPoint{}
22622258
purgeList = make([]SweepRequest, 0, len(b.sweeps))
22632259
totalSweptAmt btcutil.Amount
2260+
dbConfirmed = make([]*dbSweep, 0, len(allSweeps))
22642261
)
22652262
for _, sweep := range allSweeps {
22662263
_, found := confirmedSet[sweep.outpoint]
22672264
if found {
2268-
// Save the sweep as completed. Note that sweeps are
2269-
// marked completed after the batch is marked confirmed
2270-
// because the check in handleSweeps checks sweep's
2271-
// status first and then checks the batch status.
2272-
err := b.persistSweep(ctx, sweep, true)
2273-
if err != nil {
2274-
return err
2275-
}
2265+
// Save the sweep as completed; the batch row and all
2266+
// sweeps are persisted atomically below.
2267+
dbConfirmed = append(
2268+
dbConfirmed, b.dbSweepFrom(sweep, true),
2269+
)
22762270

22772271
confirmedSweeps = append(
22782272
confirmedSweeps, sweep.outpoint,
@@ -2328,8 +2322,15 @@ func (b *batch) handleConf(ctx context.Context,
23282322
}
23292323
}
23302324

2331-
b.Infof("fully confirmed sweeps: %v, purged sweeps: %v, "+
2332-
"purged swaps: %v", confirmedSweeps, purgedSweeps, purgedSwaps)
2325+
b.Infof("Fully confirmed sweeps: %v, purged sweeps: %v, "+
2326+
"purged swaps: %v. Saving the batch and sweeps to DB",
2327+
confirmedSweeps, purgedSweeps, purgedSwaps)
2328+
2329+
if err := b.persistConfirmedBatch(ctx, dbConfirmed); err != nil {
2330+
return fmt.Errorf("saving confirmed batch failed: %w", err)
2331+
}
2332+
2333+
b.Infof("Successfully saved the batch and confirmed sweeps to DB")
23332334

23342335
// Proceed with purging the sweeps. This will feed the sweeps that
23352336
// didn't make it to the confirmed batch transaction back to the batcher
@@ -2445,6 +2446,11 @@ func (b *batch) isComplete() bool {
24452446

24462447
// persist updates the batch in the database.
24472448
func (b *batch) persist(ctx context.Context) error {
2449+
return b.store.UpdateSweepBatch(ctx, b.dbBatch())
2450+
}
2451+
2452+
// dbBatch builds the dbBatch representation for the current in-memory state.
2453+
func (b *batch) dbBatch() *dbBatch {
24482454
bch := &dbBatch{}
24492455

24502456
bch.ID = b.id
@@ -2459,7 +2465,7 @@ func (b *batch) persist(ctx context.Context) error {
24592465
bch.LastRbfSatPerKw = int32(b.rbfCache.FeeRate)
24602466
bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance
24612467

2462-
return b.store.UpdateSweepBatch(ctx, bch)
2468+
return bch
24632469
}
24642470

24652471
// getBatchDestAddr returns the batch's destination address. If the batch
@@ -2612,16 +2618,31 @@ func (b *batch) writeToConfErrChan(ctx context.Context, confErr error) {
26122618
}
26132619
}
26142620

2621+
// persistSweep upserts the given sweep into the backing store and optionally
2622+
// marks it as completed.
26152623
func (b *batch) persistSweep(ctx context.Context, sweep sweep,
26162624
completed bool) error {
26172625

2618-
return b.store.UpsertSweep(ctx, &dbSweep{
2626+
return b.store.UpsertSweep(ctx, b.dbSweepFrom(sweep, completed))
2627+
}
2628+
2629+
// dbSweepFrom builds the dbSweep representation for a batch sweep.
2630+
func (b *batch) dbSweepFrom(sweep sweep, completed bool) *dbSweep {
2631+
return &dbSweep{
26192632
BatchID: b.id,
26202633
SwapHash: sweep.swapHash,
26212634
Outpoint: sweep.outpoint,
26222635
Amount: sweep.value,
26232636
Completed: completed,
2624-
})
2637+
}
2638+
}
2639+
2640+
// persistConfirmedBatch atomically records the batch confirmation metadata
2641+
// along with all sweeps that confirmed in the same transaction.
2642+
func (b *batch) persistConfirmedBatch(ctx context.Context,
2643+
sweeps []*dbSweep) error {
2644+
2645+
return b.store.ConfirmBatchWithSweeps(ctx, b.dbBatch(), sweeps)
26252646
}
26262647

26272648
// clampBatchFee takes the fee amount and total amount of the sweeps in the

sweepbatcher/sweep_batcher.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ type BatcherStore interface {
5959
// UpdateSweepBatch updates a batch in the database.
6060
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error
6161

62+
// ConfirmBatchWithSweeps atomically marks the batch as confirmed and
63+
// updates the provided sweeps in the database.
64+
ConfirmBatchWithSweeps(ctx context.Context, batch *dbBatch,
65+
sweeps []*dbSweep) error
66+
6267
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
6368
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)
6469

@@ -975,9 +980,8 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
975980
"sweeps with primarySweep %x: confirmed=%v",
976981
len(sweeps), sweep.swapHash[:6], parentBatch.Confirmed)
977982

978-
// Note that sweeps are marked completed after the batch is
979-
// marked confirmed because here we check the sweep status
980-
// first and then check the batch status.
983+
// Batch + sweeps are persisted atomically, so if the sweep
984+
// shows as completed its parent batch must be confirmed.
981985
if parentBatch.Confirmed {
982986
debugf("Sweep group of %d sweeps with primarySweep %x "+
983987
"is fully confirmed, switching directly to "+

0 commit comments

Comments
 (0)