Skip to content

sweepbatcher: do not fail on restoring empty batches #754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions loopdb/sqlc/batch.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions loopdb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions loopdb/sqlc/queries/batch.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ INSERT INTO sweep_batches (
$6
) RETURNING id;

-- name: DropBatch :exec
DELETE FROM sweep_batches WHERE id = $1;

-- name: UpdateBatch :exec
UPDATE sweep_batches SET
confirmed = $2,
Expand Down
22 changes: 22 additions & 0 deletions sweepbatcher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sweepbatcher
import (
"context"
"database/sql"
"fmt"

"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
Expand Down Expand Up @@ -46,6 +47,9 @@ type BaseDB interface {
InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) (
int32, error)

// DropBatch drops a batch from the database.
DropBatch(ctx context.Context, id int32) error

// UpdateBatch updates a batch in the database.
UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error

Expand Down Expand Up @@ -108,6 +112,24 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch))
}

// DropBatch drops a batch from the database. Note that we only use this call
// for batches that have no sweeps and so we'd not be able to resume.
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I propose to add a check that the number of sweep in the batch is 0. If the batch is not empty, return an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, added!

readOpts := loopdb.NewSqlReadOpts()
return s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
if err != nil {
return err
}

if len(dbSweeps) != 0 {
return fmt.Errorf("cannot drop a non-empty batch")
}

return tx.DropBatch(ctx, id)
})
}

// UpdateSweepBatch updates a batch in the database.
func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))
Expand Down
6 changes: 6 additions & 0 deletions sweepbatcher/store_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context,
return id, nil
}

// DropBatch drops a batch from the database.
func (s *StoreMock) DropBatch(ctx context.Context, id int32) error {
delete(s.batches, id)
return nil
}

// UpdateSweepBatch updates a batch in the database.
func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
batch *dbBatch) error {
Expand Down
27 changes: 22 additions & 5 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"math"
"sync"
Expand Down Expand Up @@ -48,7 +49,7 @@ const (
)

var (
ErrBatchShuttingDown = fmt.Errorf("batch shutting down")
ErrBatchShuttingDown = errors.New("batch shutting down")
)

// sweep stores any data related to sweeping a specific outpoint.
Expand Down Expand Up @@ -196,7 +197,11 @@ type batch struct {
// main event loop.
callLeave chan struct{}

// quit signals that the batch must stop.
// stopped signals that the batch has stopped.
stopped chan struct{}

// quit is owned by the parent batcher and signals that the batch must
// stop.
quit chan struct{}

// wallet is the wallet client used to create and publish the batch
Expand Down Expand Up @@ -260,6 +265,7 @@ type batchKit struct {
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}

// scheduleNextCall schedules the next call to the batch handler's main event
Expand All @@ -269,6 +275,9 @@ func (b *batch) scheduleNextCall() (func(), error) {
case b.callEnter <- struct{}{}:

case <-b.quit:
return func() {}, ErrBatcherShuttingDown

case <-b.stopped:
return func() {}, ErrBatchShuttingDown
}

Expand All @@ -292,7 +301,8 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
quit: make(chan struct{}),
stopped: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
Expand All @@ -319,7 +329,8 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch {
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
quit: make(chan struct{}),
stopped: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
Expand Down Expand Up @@ -446,7 +457,7 @@ func (b *batch) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.quit)
close(b.stopped)
b.wg.Wait()
}()

Expand Down Expand Up @@ -539,6 +550,12 @@ func (b *batch) publish(ctx context.Context) error {
coopSuccess bool
)

if len(b.sweeps) == 0 {
b.log.Debugf("skipping publish: no sweeps in the batch")

return nil
}

// Run the RBF rate update.
err = b.updateRbfRate(ctx)
if err != nil {
Expand Down
73 changes: 47 additions & 26 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sweepbatcher

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -46,36 +47,35 @@ const (
type BatcherStore interface {
// FetchUnconfirmedSweepBatches fetches all the batches from the
// database that are not in a confirmed state.
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch,
error)
FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, error)

// InsertSweepBatch inserts a batch into the database, returning the id
// of the inserted batch.
InsertSweepBatch(ctx context.Context,
batch *dbBatch) (int32, error)
InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, error)

// DropBatch drops a batch from the database. This should only be used
// when a batch is empty.
DropBatch(ctx context.Context, id int32) error

// UpdateSweepBatch updates a batch in the database.
UpdateSweepBatch(ctx context.Context,
batch *dbBatch) error
UpdateSweepBatch(ctx context.Context, batch *dbBatch) error

// ConfirmBatch confirms a batch by setting its state to confirmed.
ConfirmBatch(ctx context.Context, id int32) error

// FetchBatchSweeps fetches all the sweeps that belong to a batch.
FetchBatchSweeps(ctx context.Context,
id int32) ([]*dbSweep, error)
FetchBatchSweeps(ctx context.Context, id int32) ([]*dbSweep, error)

// UpsertSweep inserts a sweep into the database, or updates an existing
// sweep if it already exists.
UpsertSweep(ctx context.Context, sweep *dbSweep) error

// GetSweepStatus returns the completed status of the sweep.
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (
bool, error)
GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (bool, error)

// GetParentBatch returns the parent batch of a (completed) sweep.
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (
*dbBatch, error)
GetParentBatch(ctx context.Context, swapHash lntypes.Hash) (*dbBatch,
error)

// TotalSweptAmount returns the total amount swept by a (confirmed)
// batch.
Expand Down Expand Up @@ -135,7 +135,7 @@ type SpendNotifier struct {
}

var (
ErrBatcherShuttingDown = fmt.Errorf("batcher shutting down")
ErrBatcherShuttingDown = errors.New("batcher shutting down")
)

// Batcher is a system that is responsible for accepting sweep requests and
Expand All @@ -153,6 +153,10 @@ type Batcher struct {
// quit signals that the batch must stop.
quit chan struct{}

// initDone is a channel that is closed when the batcher has been
// initialized.
initDone chan struct{}

// wallet is the wallet kit client that is used by batches.
wallet lndclient.WalletKitClient

Expand Down Expand Up @@ -200,6 +204,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
Expand All @@ -216,6 +221,7 @@ func (b *Batcher) Run(ctx context.Context) error {
runCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
close(b.quit)

for _, batch := range b.batches {
batch.Wait()
Expand All @@ -238,6 +244,9 @@ func (b *Batcher) Run(ctx context.Context) error {
}
}

// Signal that the batcher has been initialized.
close(b.initDone)

for {
select {
case sweepReq := <-b.sweepReqs:
Expand Down Expand Up @@ -306,7 +315,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,

if batch.sweepExists(sweep.swapHash) {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil {
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
return err
}

Expand All @@ -321,7 +330,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
// If one of the batches accepts the sweep, we provide it to that batch.
for _, batch := range b.batches {
accepted, err := batch.addSweep(ctx, sweep)
if err != nil && err != ErrBatchShuttingDown {
if err != nil && !errors.Is(err, ErrBatchShuttingDown) {
return err
}

Expand Down Expand Up @@ -379,6 +388,7 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}

batch := NewBatch(cfg, batchKit)
Expand Down Expand Up @@ -407,23 +417,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
// spinUpBatchDB spins up a batch that already existed in storage, then
// returns it.
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
cfg := batchConfig{
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
batchConfTarget: defaultBatchConfTarget,
}

rbfCache := rbfCache{
LastHeight: batch.rbfCache.LastHeight,
FeeRate: batch.rbfCache.FeeRate,
}

dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
if err != nil {
return err
}

if len(dbSweeps) == 0 {
return fmt.Errorf("batch %d has no sweeps", batch.id)
log.Infof("skipping restored batch %d as it has no sweeps",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe also delete from storage

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

batch.id)

// It is safe to drop this empty batch as it has no sweeps.
err := b.store.DropBatch(ctx, batch.id)
if err != nil {
log.Warnf("unable to drop empty batch %d: %v",
batch.id, err)
}

return nil
}

primarySweep := dbSweeps[0]
Expand All @@ -439,6 +449,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
sweeps[sweep.swapHash] = *sweep
}

rbfCache := rbfCache{
LastHeight: batch.rbfCache.LastHeight,
FeeRate: batch.rbfCache.FeeRate,
}

batchKit := batchKit{
id: batch.id,
batchTxid: batch.batchTxid,
Expand All @@ -456,6 +471,12 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
purger: b.AddSweep,
store: b.store,
log: batchPrefixLogger(fmt.Sprintf("%d", batch.id)),
quit: b.quit,
}

cfg := batchConfig{
maxTimeoutDistance: batch.cfg.maxTimeoutDistance,
batchConfTarget: defaultBatchConfTarget,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why setting defaultBatchConfTarget is needed here? If it is used, it means, that confTargets of sweeps were not taken into account.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The source of confTarget is fetchSweep method. I think, after resuming from DB we should make sure that confTarget is filled from fetchSweep before it is used to calculate fee rate (i.e. before publish). Then we can remove const defaultBatchConfTarget.

Copy link
Member

@GeorgeTsagk GeorgeTsagk May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update this when the first sweep comes in, see here

but agree it's basically a noop setting this here, as it's guaranteed it will be overwritten

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is just a reorder, no added code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, ideally let's not introduce unwanted changes

}

newBatch := NewBatchFromDB(cfg, batchKit)
Expand Down
Loading
Loading