Skip to content

Commit 563e7be

Browse files
authored
Merge pull request #754 from bhandras/sweepbatcher-empty-batch-fix
sweepbatcher: do not fail on restoring empty batches
2 parents 38f0e3a + 14de8f1 commit 563e7be

File tree

8 files changed

+208
-47
lines changed

8 files changed

+208
-47
lines changed

loopdb/sqlc/batch.sql.go

+9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/querier.go

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

loopdb/sqlc/queries/batch.sql

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ INSERT INTO sweep_batches (
2323
$6
2424
) RETURNING id;
2525

26+
-- name: DropBatch :exec
27+
DELETE FROM sweep_batches WHERE id = $1;
28+
2629
-- name: UpdateBatch :exec
2730
UPDATE sweep_batches SET
2831
confirmed = $2,

sweepbatcher/store.go

+22
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package sweepbatcher
33
import (
44
"context"
55
"database/sql"
6+
"fmt"
67

78
"github.com/btcsuite/btcd/btcutil"
89
"github.com/btcsuite/btcd/chaincfg"
@@ -46,6 +47,9 @@ type BaseDB interface {
4647
InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) (
4748
int32, error)
4849

50+
// DropBatch drops a batch from the database.
51+
DropBatch(ctx context.Context, id int32) error
52+
4953
// UpdateBatch updates a batch in the database.
5054
UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error
5155

@@ -108,6 +112,24 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
108112
return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch))
109113
}
110114

115+
// DropBatch drops a batch from the database. Note that we only use this call
116+
// for batches that have no sweeps and so we'd not be able to resume.
117+
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
118+
readOpts := loopdb.NewSqlReadOpts()
119+
return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
120+
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
121+
if err != nil {
122+
return err
123+
}
124+
125+
if len(dbSweeps) != 0 {
126+
return fmt.Errorf("cannot drop a non-empty batch")
127+
}
128+
129+
return tx.DropBatch(ctx, id)
130+
})
131+
}
132+
111133
// UpdateSweepBatch updates a batch in the database.
112134
func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
113135
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))

sweepbatcher/store_mock.go

+6
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,12 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context,
5656
return id, nil
5757
}
5858

59+
// DropBatch drops a batch from the database.
60+
func (s *StoreMock) DropBatch(ctx context.Context, id int32) error {
61+
delete(s.batches, id)
62+
return nil
63+
}
64+
5965
// UpdateSweepBatch updates a batch in the database.
6066
func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
6167
batch *dbBatch) error {

sweepbatcher/sweep_batch.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"encoding/hex"
7+
"errors"
78
"fmt"
89
"math"
910
"sync"
@@ -48,7 +49,7 @@ const (
4849
)
4950

5051
var (
51-
ErrBatchShuttingDown = fmt.Errorf("batch shutting down")
52+
ErrBatchShuttingDown = errors.New("batch shutting down")
5253
)
5354

5455
// sweep stores any data related to sweeping a specific outpoint.
@@ -196,7 +197,11 @@ type batch struct {
196197
// main event loop.
197198
callLeave chan struct{}
198199

199-
// quit signals that the batch must stop.
200+
// stopped signals that the batch has stopped.
201+
stopped chan struct{}
202+
203+
// quit is owned by the parent batcher and signals that the batch must
204+
// stop.
200205
quit chan struct{}
201206

202207
// wallet is the wallet client used to create and publish the batch
@@ -260,6 +265,7 @@ type batchKit struct {
260265
purger Purger
261266
store BatcherStore
262267
log btclog.Logger
268+
quit chan struct{}
263269
}
264270

265271
// scheduleNextCall schedules the next call to the batch handler's main event
@@ -269,6 +275,9 @@ func (b *batch) scheduleNextCall() (func(), error) {
269275
case b.callEnter <- struct{}{}:
270276

271277
case <-b.quit:
278+
return func() {}, ErrBatcherShuttingDown
279+
280+
case <-b.stopped:
272281
return func() {}, ErrBatchShuttingDown
273282
}
274283

@@ -292,7 +301,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
292301
errChan: make(chan error, 1),
293302
callEnter: make(chan struct{}),
294303
callLeave: make(chan struct{}),
295-
quit: make(chan struct{}),
304+
stopped: make(chan struct{}),
305+
quit: bk.quit,
296306
batchTxid: bk.batchTxid,
297307
wallet: bk.wallet,
298308
chainNotifier: bk.chainNotifier,
@@ -319,7 +329,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
319329
errChan: make(chan error, 1),
320330
callEnter: make(chan struct{}),
321331
callLeave: make(chan struct{}),
322-
quit: make(chan struct{}),
332+
stopped: make(chan struct{}),
333+
quit: bk.quit,
323334
batchTxid: bk.batchTxid,
324335
batchPkScript: bk.batchPkScript,
325336
rbfCache: bk.rbfCache,
@@ -446,7 +457,7 @@ func (b *batch) Run(ctx context.Context) error {
446457
runCtx, cancel := context.WithCancel(ctx)
447458
defer func() {
448459
cancel()
449-
close(b.quit)
460+
close(b.stopped)
450461
b.wg.Wait()
451462
}()
452463

@@ -539,6 +550,12 @@ func (b *batch) publish(ctx context.Context) error {
539550
coopSuccess bool
540551
)
541552

553+
if len(b.sweeps) == 0 {
554+
b.log.Debugf("skipping publish: no sweeps in the batch")
555+
556+
return nil
557+
}
558+
542559
// Run the RBF rate update.
543560
err = b.updateRbfRate(ctx)
544561
if err != nil {

sweepbatcher/sweep_batcher.go

+47-26
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sweepbatcher
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"sync"
78
"time"
@@ -46,36 +47,35 @@ const (
4647
type BatcherStore interface {
4748
// FetchUnconfirmedSweepBatches fetches all the batches from the
4849
// database that are not in a confirmed state.
49-
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch,
50-
error)
50+
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error)
5151

5252
// InsertSweepBatch inserts a batch into the database, returning the id
5353
// of the inserted batch.
54-
InsertSweepBatch(ctx context.Context,
55-
batch *dbBatch) (int32, error)
54+
InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error)
55+
56+
// DropBatch drops a batch from the database. This should only be used
57+
// when a batch is empty.
58+
DropBatch(ctx context.Context, id int32) error
5659

5760
// UpdateSweepBatch updates a batch in the database.
58-
UpdateSweepBatch(ctx context.Context,
59-
batch *dbBatch) error
61+
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error
6062

6163
// ConfirmBatch confirms a batch by setting its state to confirmed.
6264
ConfirmBatch(ctx context.Context, id int32) error
6365

6466
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
65-
FetchBatchSweeps(ctx context.Context,
66-
id int32) ([]*dbSweep, error)
67+
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)
6768

6869
// UpsertSweep inserts a sweep into the database, or updates an existing
6970
// sweep if it already exists.
7071
UpsertSweep(ctx context.Context, sweep *dbSweep) error
7172

7273
// GetSweepStatus returns the completed status of the sweep.
73-
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (
74-
bool, error)
74+
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error)
7575

7676
// GetParentBatch returns the parent batch of a (completed) sweep.
77-
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (
78-
*dbBatch, error)
77+
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch,
78+
error)
7979

8080
// TotalSweptAmount returns the total amount swept by a (confirmed)
8181
// batch.
@@ -135,7 +135,7 @@ type SpendNotifier struct {
135135
}
136136

137137
var (
138-
ErrBatcherShuttingDown = fmt.Errorf("batcher shutting down")
138+
ErrBatcherShuttingDown = errors.New("batcher shutting down")
139139
)
140140

141141
// Batcher is a system that is responsible for accepting sweep requests and
@@ -153,6 +153,10 @@ type Batcher struct {
153153
// quit signals that the batch must stop.
154154
quit chan struct{}
155155

156+
// initDone is a channel that is closed when the batcher has been
157+
// initialized.
158+
initDone chan struct{}
159+
156160
// wallet is the wallet kit client that is used by batches.
157161
wallet lndclient.WalletKitClient
158162

@@ -200,6 +204,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
200204
sweepReqs: make(chan SweepRequest),
201205
errChan: make(chan error, 1),
202206
quit: make(chan struct{}),
207+
initDone: make(chan struct{}),
203208
wallet: wallet,
204209
chainNotifier: chainNotifier,
205210
signerClient: signerClient,
@@ -216,6 +221,7 @@ func (b *Batcher) Run(ctx context.Context) error {
216221
runCtx, cancel := context.WithCancel(ctx)
217222
defer func() {
218223
cancel()
224+
close(b.quit)
219225

220226
for _, batch := range b.batches {
221227
batch.Wait()
@@ -238,6 +244,9 @@ func (b *Batcher) Run(ctx context.Context) error {
238244
}
239245
}
240246

247+
// Signal that the batcher has been initialized.
248+
close(b.initDone)
249+
241250
for {
242251
select {
243252
case sweepReq := <-b.sweepReqs:
@@ -306,7 +315,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
306315

307316
if batch.sweepExists(sweep.swapHash) {
308317
accepted, err := batch.addSweep(ctx, sweep)
309-
if err != nil {
318+
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
310319
return err
311320
}
312321

@@ -321,7 +330,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
321330
// If one of the batches accepts the sweep, we provide it to that batch.
322331
for _, batch := range b.batches {
323332
accepted, err := batch.addSweep(ctx, sweep)
324-
if err != nil && err != ErrBatchShuttingDown {
333+
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
325334
return err
326335
}
327336

@@ -379,6 +388,7 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
379388
verifySchnorrSig: b.VerifySchnorrSig,
380389
purger: b.AddSweep,
381390
store: b.store,
391+
quit: b.quit,
382392
}
383393

384394
batch := NewBatch(cfg, batchKit)
@@ -407,23 +417,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
407417
// spinUpBatchDB spins up a batch that already existed in storage, then
408418
// returns it.
409419
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
410-
cfg := batchConfig{
411-
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
412-
batchConfTarget: defaultBatchConfTarget,
413-
}
414-
415-
rbfCache := rbfCache{
416-
LastHeight: batch.rbfCache.LastHeight,
417-
FeeRate: batch.rbfCache.FeeRate,
418-
}
419-
420420
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
421421
if err != nil {
422422
return err
423423
}
424424

425425
if len(dbSweeps) == 0 {
426-
return fmt.Errorf("batch %d has no sweeps", batch.id)
426+
log.Infof("skipping restored batch %d as it has no sweeps",
427+
batch.id)
428+
429+
// It is safe to drop this empty batch as it has no sweeps.
430+
err := b.store.DropBatch(ctx, batch.id)
431+
if err != nil {
432+
log.Warnf("unable to drop empty batch %d: %v",
433+
batch.id, err)
434+
}
435+
436+
return nil
427437
}
428438

429439
primarySweep := dbSweeps[0]
@@ -439,6 +449,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
439449
sweeps[sweep.swapHash] = *sweep
440450
}
441451

452+
rbfCache := rbfCache{
453+
LastHeight: batch.rbfCache.LastHeight,
454+
FeeRate: batch.rbfCache.FeeRate,
455+
}
456+
442457
batchKit := batchKit{
443458
id: batch.id,
444459
batchTxid: batch.batchTxid,
@@ -456,6 +471,12 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
456471
purger: b.AddSweep,
457472
store: b.store,
458473
log: batchPrefixLogger(fmt.Sprintf("%d", batch.id)),
474+
quit: b.quit,
475+
}
476+
477+
cfg := batchConfig{
478+
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
479+
batchConfTarget: defaultBatchConfTarget,
459480
}
460481

461482
newBatch := NewBatchFromDB(cfg, batchKit)

0 commit comments

Comments
 (0)