Skip to content

Commit 800890c

Browse files
committed
sweepbatcher: add an option to ignore HTLC txids
Added option WithSkippedTxs, which has one historical problematic tx by default. Sweeps originating from these transactions are omitted when reading from DB. Changed SQLStore.DropBatch not to check that the batch being removed is empty. It is not the case when removing a batch having only skipped sweeps. Changed StoreMock.InsertSweepBatch not to reuse batch_id. This is needed by the test, which checks that new batch has fresh ID.
1 parent ab69588 commit 800890c

File tree

9 files changed

+247
-27
lines changed

9 files changed

+247
-27
lines changed

client.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/btcsuite/btcd/btcec/v2"
1313
"github.com/btcsuite/btcd/btcec/v2/schnorr"
1414
"github.com/btcsuite/btcd/btcutil"
15+
"github.com/btcsuite/btcd/chaincfg/chainhash"
1516
"github.com/lightninglabs/aperture/l402"
1617
"github.com/lightninglabs/lndclient"
1718
"github.com/lightninglabs/loop/assets"
@@ -146,6 +147,10 @@ type ClientConfig struct {
146147
// be attempted.
147148
LoopOutMaxParts uint32
148149

150+
// SkippedTxs is the list of existing HTLC txids to skip when starting
151+
// Loop. This should only be used if affected by the historical bug.
152+
SkippedTxs []string
153+
149154
// TotalPaymentTimeout is the total amount of time until we time out
150155
// off-chain payments (used in loop out).
151156
TotalPaymentTimeout time.Duration
@@ -246,11 +251,7 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
246251
sweeper, loopDB, cfg.Lnd.ChainParams, getHeight,
247252
)
248253

249-
batcher := sweepbatcher.NewBatcher(
250-
cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer,
251-
swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig,
252-
cfg.Lnd.ChainParams, sweeperDb, sweepStore,
253-
254+
batcherOpts := []sweepbatcher.BatcherOption{
254255
// Disable 100 sats/kw fee bump every block and retarget feerate
255256
// every block according to the current mempool condition.
256257
sweepbatcher.WithCustomFeeRate(
@@ -265,8 +266,29 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
265266
// delay time to sweepbatcher's handling. The delay used in
266267
// loopout.go is repushDelay.
267268
sweepbatcher.WithPublishDelay(
268-
repushDelay+additionalDelay,
269+
repushDelay + additionalDelay,
269270
),
271+
}
272+
273+
if len(cfg.SkippedTxs) != 0 {
274+
skippedTxs := make(map[chainhash.Hash]struct{})
275+
for _, txid := range cfg.SkippedTxs {
276+
txid, err := chainhash.NewHashFromStr(txid)
277+
if err != nil {
278+
return nil, nil, fmt.Errorf("failed to parse "+
279+
"txid to skip %v: %w", txid, err)
280+
}
281+
skippedTxs[*txid] = struct{}{}
282+
}
283+
batcherOpts = append(batcherOpts, sweepbatcher.WithSkippedTxs(
284+
skippedTxs,
285+
))
286+
}
287+
288+
batcher := sweepbatcher.NewBatcher(
289+
cfg.Lnd.WalletKit, cfg.Lnd.ChainNotifier, cfg.Lnd.Signer,
290+
swapServerClient.MultiMuSig2SignSweep, verifySchnorrSig,
291+
cfg.Lnd.ChainParams, sweeperDb, sweepStore, batcherOpts...,
270292
)
271293

272294
executor = newExecutor(&executorConfig{

loopd/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ type Config struct {
184184

185185
LoopOutMaxParts uint32 `long:"loopoutmaxparts" description:"The maximum number of payment parts that may be used for a loop out swap."`
186186

187+
SkippedTxs []string `long:"skippedtxs" description:"The list of existing HTLC txids to skip when starting Loop. This should only be used if affected by the historical bug."`
188+
187189
TotalPaymentTimeout time.Duration `long:"totalpaymenttimeout" description:"The timeout to use for off-chain payments."`
188190
MaxPaymentRetries int `long:"maxpaymentretries" description:"The maximum number of times an off-chain payment may be retried."`
189191

loopd/utils.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func getClient(cfg *Config, swapDb loopdb.SwapStore,
5050
MaxL402Cost: btcutil.Amount(cfg.MaxL402Cost),
5151
MaxL402Fee: btcutil.Amount(cfg.MaxL402Fee),
5252
LoopOutMaxParts: cfg.LoopOutMaxParts,
53+
SkippedTxs: cfg.SkippedTxs,
5354
TotalPaymentTimeout: cfg.TotalPaymentTimeout,
5455
MaxPaymentRetries: cfg.MaxPaymentRetries,
5556
MaxStaticAddrHtlcFeePercentage: cfg.MaxStaticAddrHtlcFeePercentage,

sweepbatcher/presigned.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ func (b *batch) getOrderedSweeps(ctx context.Context) ([]sweep, error) {
126126
return nil, fmt.Errorf("FetchBatchSweeps(%d) failed: %w", b.id,
127127
err)
128128
}
129+
dbSweeps = filterDbSweeps(b.cfg.skippedTxs, dbSweeps)
130+
129131
if len(dbSweeps) != len(utxo2sweep) {
130132
return nil, fmt.Errorf("FetchBatchSweeps(%d) returned %d "+
131133
"sweeps, len(b.sweeps) is %d", b.id, len(dbSweeps),

sweepbatcher/store.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sweepbatcher
33
import (
44
"context"
55
"database/sql"
6-
"fmt"
76

87
"github.com/btcsuite/btcd/btcutil"
98
"github.com/btcsuite/btcd/chaincfg"
@@ -116,19 +115,7 @@ func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
116115
// DropBatch drops a batch from the database. Note that we only use this call
117116
// for batches that have no sweeps and so we'd not be able to resume.
118117
func (s *SQLStore) DropBatch(ctx context.Context, id int32) error {
119-
readOpts := loopdb.NewSqlWriteOpts()
120-
return s.baseDb.ExecTx(ctx, readOpts, func(tx Querier) error {
121-
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
122-
if err != nil {
123-
return err
124-
}
125-
126-
if len(dbSweeps) != 0 {
127-
return fmt.Errorf("cannot drop a non-empty batch")
128-
}
129-
130-
return tx.DropBatch(ctx, id)
131-
})
118+
return s.baseDb.DropBatch(ctx, id)
132119
}
133120

134121
// UpdateSweepBatch updates a batch in the database.

sweepbatcher/store_mock.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type StoreMock struct {
1616
sweeps map[wire.OutPoint]dbSweep
1717
mu sync.Mutex
1818
sweepID int32
19+
batchID int32
1920
}
2021

2122
// NewStoreMock instantiates a new mock store.
@@ -52,13 +53,8 @@ func (s *StoreMock) InsertSweepBatch(ctx context.Context,
5253
s.mu.Lock()
5354
defer s.mu.Unlock()
5455

55-
var id int32
56-
57-
if len(s.batches) == 0 {
58-
id = 0
59-
} else {
60-
id = int32(len(s.batches))
61-
}
56+
id := s.batchID
57+
s.batchID++
6258

6359
s.batches[id] = *batch
6460
return id, nil

sweepbatcher/sweep_batch.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ type batchConfig struct {
187187
// enabled.
188188
presignedHelper PresignedHelper
189189

190+
// skippedTxs is the list of previous transactions to ignore when
191+
// loading the sweeps from DB. This is needed to fix a historical bug.
192+
skippedTxs map[chainhash.Hash]struct{}
193+
190194
// chainParams are the chain parameters of the chain that is used by
191195
// batches.
192196
chainParams *chaincfg.Params

sweepbatcher/sweep_batcher.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ type Batcher struct {
440440
// presignedHelper provides methods used when presigned batches are
441441
// enabled.
442442
presignedHelper PresignedHelper
443+
444+
// skippedTxs is the list of previous transactions to ignore when
445+
// loading the sweeps from DB. This is needed to fix a historical bug.
446+
skippedTxs map[chainhash.Hash]struct{}
443447
}
444448

445449
// BatcherConfig holds batcher configuration.
@@ -484,6 +488,10 @@ type BatcherConfig struct {
484488
// presignedHelper provides methods used when presigned batches are
485489
// enabled.
486490
presignedHelper PresignedHelper
491+
492+
// skippedTxs is the list of previous transactions to ignore when
493+
// loading the sweeps from DB. This is needed to fix a historical bug.
494+
skippedTxs map[chainhash.Hash]struct{}
487495
}
488496

489497
// BatcherOption configures batcher behaviour.
@@ -566,6 +574,14 @@ func WithPresignedHelper(presignedHelper PresignedHelper) BatcherOption {
566574
}
567575
}
568576

577+
// WithSkippedTxs is the list of previous transactions to ignore when
578+
// loading the sweeps from DB. This is needed to fix a historical bug.
579+
func WithSkippedTxs(skippedTxs map[chainhash.Hash]struct{}) BatcherOption {
580+
return func(cfg *BatcherConfig) {
581+
cfg.skippedTxs = skippedTxs
582+
}
583+
}
584+
569585
// NewBatcher creates a new Batcher instance.
570586
func NewBatcher(wallet lndclient.WalletKitClient,
571587
chainNotifier lndclient.ChainNotifierClient,
@@ -574,6 +590,14 @@ func NewBatcher(wallet lndclient.WalletKitClient,
574590
store BatcherStore, sweepStore SweepFetcher,
575591
opts ...BatcherOption) *Batcher {
576592

593+
badTx1, err := chainhash.NewHashFromStr(
594+
"7028bdac753a254785d29506f311abcda323706b531345105f38999" +
595+
"aecd6f3d1",
596+
)
597+
if err != nil {
598+
panic(err)
599+
}
600+
577601
cfg := BatcherConfig{
578602
// By default, loop/labels.LoopOutBatchSweepSuccess is used
579603
// to label sweep transactions.
@@ -583,6 +607,10 @@ func NewBatcher(wallet lndclient.WalletKitClient,
583607
// publishing error. By default, it logs all errors as warnings,
584608
// but "insufficient fee" as Info.
585609
publishErrorHandler: defaultPublishErrorLogger,
610+
611+
skippedTxs: map[chainhash.Hash]struct{}{
612+
*badTx1: {},
613+
},
586614
}
587615
for _, opt := range opts {
588616
opt(&cfg)
@@ -621,6 +649,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
621649
customMuSig2Signer: cfg.customMuSig2Signer,
622650
publishErrorHandler: cfg.publishErrorHandler,
623651
presignedHelper: cfg.presignedHelper,
652+
skippedTxs: cfg.skippedTxs,
624653
}
625654
}
626655

@@ -1000,13 +1029,30 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
10001029
return batch, nil
10011030
}
10021031

1032+
// filterDbSweeps copies dbSweeps, skipping the sweeps from skipped txs.
1033+
func filterDbSweeps(skippedTxs map[chainhash.Hash]struct{},
1034+
dbSweeps []*dbSweep) []*dbSweep {
1035+
1036+
result := make([]*dbSweep, 0, len(dbSweeps))
1037+
for _, dbSweep := range dbSweeps {
1038+
if _, has := skippedTxs[dbSweep.Outpoint.Hash]; has {
1039+
continue
1040+
}
1041+
1042+
result = append(result, dbSweep)
1043+
}
1044+
1045+
return result
1046+
}
1047+
10031048
// spinUpBatchFromDB spins up a batch that already existed in storage, then
10041049
// returns it.
10051050
func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
10061051
dbSweeps, err := b.store.FetchBatchSweeps(ctx, batch.id)
10071052
if err != nil {
10081053
return err
10091054
}
1055+
dbSweeps = filterDbSweeps(b.skippedTxs, dbSweeps)
10101056

10111057
if len(dbSweeps) == 0 {
10121058
infof("skipping restored batch %d as it has no sweeps",
@@ -1502,6 +1548,7 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
15021548
txLabeler: b.txLabeler,
15031549
customMuSig2Signer: b.customMuSig2Signer,
15041550
presignedHelper: b.presignedHelper,
1551+
skippedTxs: b.skippedTxs,
15051552
clock: b.clock,
15061553
chainParams: b.chainParams,
15071554
}

0 commit comments

Comments
 (0)