@@ -2,6 +2,7 @@ package sweepbatcher
22
33import (
44 "context"
5+ "errors"
56 "fmt"
67 "sync"
78 "time"
@@ -46,36 +47,35 @@ const (
4647type BatcherStore interface {
4748 // FetchUnconfirmedSweepBatches fetches all the batches from the
4849 // database that are not in a confirmed state.
49- FetchUnconfirmedSweepBatches (ctx context.Context ) ([]* dbBatch ,
50- error )
50+ FetchUnconfirmedSweepBatches (ctx context.Context ) ([]* dbBatch , error )
5151
5252 // InsertSweepBatch inserts a batch into the database, returning the id
5353 // of the inserted batch.
54- InsertSweepBatch (ctx context.Context ,
55- batch * dbBatch ) (int32 , error )
54+ InsertSweepBatch (ctx context.Context , batch * dbBatch ) (int32 , error )
55+
56+ // DropBatch drops a batch from the database. This should only be used
57+ // when a batch is empty.
58+ DropBatch (ctx context.Context , id int32 ) error
5659
5760 // UpdateSweepBatch updates a batch in the database.
58- UpdateSweepBatch (ctx context.Context ,
59- batch * dbBatch ) error
61+ UpdateSweepBatch (ctx context.Context , batch * dbBatch ) error
6062
6163 // ConfirmBatch confirms a batch by setting its state to confirmed.
6264 ConfirmBatch (ctx context.Context , id int32 ) error
6365
6466 // FetchBatchSweeps fetches all the sweeps that belong to a batch.
65- FetchBatchSweeps (ctx context.Context ,
66- id int32 ) ([]* dbSweep , error )
67+ FetchBatchSweeps (ctx context.Context , id int32 ) ([]* dbSweep , error )
6768
6869 // UpsertSweep inserts a sweep into the database, or updates an existing
6970 // sweep if it already exists.
7071 UpsertSweep (ctx context.Context , sweep * dbSweep ) error
7172
7273 // GetSweepStatus returns the completed status of the sweep.
73- GetSweepStatus (ctx context.Context , swapHash lntypes.Hash ) (
74- bool , error )
74+ GetSweepStatus (ctx context.Context , swapHash lntypes.Hash ) (bool , error )
7575
7676 // GetParentBatch returns the parent batch of a (completed) sweep.
77- GetParentBatch (ctx context.Context , swapHash lntypes.Hash ) (
78- * dbBatch , error )
77+ GetParentBatch (ctx context.Context , swapHash lntypes.Hash ) (* dbBatch ,
78+ error )
7979
8080 // TotalSweptAmount returns the total amount swept by a (confirmed)
8181 // batch.
@@ -135,7 +135,7 @@ type SpendNotifier struct {
135135}
136136
137137var (
138- ErrBatcherShuttingDown = fmt . Errorf ("batcher shutting down" )
138+ ErrBatcherShuttingDown = errors . New ("batcher shutting down" )
139139)
140140
141141// Batcher is a system that is responsible for accepting sweep requests and
@@ -306,7 +306,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
306306
307307 if batch .sweepExists (sweep .swapHash ) {
308308 accepted , err := batch .addSweep (ctx , sweep )
309- if err != nil {
309+ if err != nil && ! errors . Is ( err , ErrBatchShuttingDown ) {
310310 return err
311311 }
312312
@@ -321,7 +321,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
321321 // If one of the batches accepts the sweep, we provide it to that batch.
322322 for _ , batch := range b .batches {
323323 accepted , err := batch .addSweep (ctx , sweep )
324- if err != nil && err != ErrBatchShuttingDown {
324+ if err != nil && ! errors . Is ( err , ErrBatchShuttingDown ) {
325325 return err
326326 }
327327
@@ -407,23 +407,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
407407// spinUpBatchDB spins up a batch that already existed in storage, then
408408// returns it.
409409func (b * Batcher ) spinUpBatchFromDB (ctx context.Context , batch * batch ) error {
410- cfg := batchConfig {
411- maxTimeoutDistance : batch .cfg .maxTimeoutDistance ,
412- batchConfTarget : defaultBatchConfTarget ,
413- }
414-
415- rbfCache := rbfCache {
416- LastHeight : batch .rbfCache .LastHeight ,
417- FeeRate : batch .rbfCache .FeeRate ,
418- }
419-
420410 dbSweeps , err := b .store .FetchBatchSweeps (ctx , batch .id )
421411 if err != nil {
422412 return err
423413 }
424414
425415 if len (dbSweeps ) == 0 {
426- return fmt .Errorf ("batch %d has no sweeps" , batch .id )
416+ log .Infof ("skipping restored batch %d as it has no sweeps" ,
417+ batch .id )
418+
419+ // It is safe to drop this empty batch as it has no sweeps.
420+ err := b .store .DropBatch (ctx , batch .id )
421+ if err != nil {
422+ log .Warnf ("unable to drop empty batch %d: %v" ,
423+ batch .id , err )
424+ }
425+
426+ return nil
427427 }
428428
429429 primarySweep := dbSweeps [0 ]
@@ -439,6 +439,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
439439 sweeps [sweep .swapHash ] = * sweep
440440 }
441441
442+ rbfCache := rbfCache {
443+ LastHeight : batch .rbfCache .LastHeight ,
444+ FeeRate : batch .rbfCache .FeeRate ,
445+ }
446+
442447 batchKit := batchKit {
443448 id : batch .id ,
444449 batchTxid : batch .batchTxid ,
@@ -458,6 +463,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
458463 log : batchPrefixLogger (fmt .Sprintf ("%d" , batch .id )),
459464 }
460465
466+ cfg := batchConfig {
467+ maxTimeoutDistance : batch .cfg .maxTimeoutDistance ,
468+ batchConfTarget : defaultBatchConfTarget ,
469+ }
470+
461471 newBatch := NewBatchFromDB (cfg , batchKit )
462472
463473 // We add the batch to our map of batches and start it.
0 commit comments