Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,26 @@ func (b *Builder) IsZombieChannel(updateTime1,
return e1Zombie && e2Zombie
}

// IsZombieByAge checks if a channel is a zombie by its age. It uses the
// timestamp of the block of the transaction that opened the channel. We use
// this only for channels that have no edge policies, as we can't use the last
// update timestamp to determine if the channel is a zombie.
func (b *Builder) IsZombieByAge(scid uint64) (bool, error) {
blockHeight := lnwire.NewShortChanIDFromInt(scid).BlockHeight

blockhash, err := b.cfg.Chain.GetBlockHash(int64(blockHeight))
if err != nil {
return false, err
}

header, err := b.cfg.Chain.GetBlockHeader(blockhash)
if err != nil {
return false, err
}

return time.Since(header.Timestamp) >= b.cfg.ChannelPruneExpiry, nil
}

// pruneZombieChans is a method that will be called periodically to prune out
// any "zombie" channels. We consider channels zombies if *both* edges haven't
// been updated since our zombie horizon. If AssumeChannelValid is present,
Expand Down Expand Up @@ -536,6 +556,28 @@ func (b *Builder) pruneZombieChans() error {
return nil
}

// If both edges are nil, then we'll check if the channel is a
// zombie that has been opened for long and never received a
// policy update.
if e1 == nil && e2 == nil {
isZombie, err := b.IsZombieByAge(info.ChannelID)
if err != nil {
return fmt.Errorf("unable to check if "+
"channel is a zombie: %w", err)
}

if isZombie {
log.Trace("Channel with chan_id=%v is zombie",
info.ChannelID)

chansToPrune[info.ChannelID] = struct{}{}
}

// We've handled channels with no policies, so we can
// exit early to process the next channel.
return nil
}

e1Zombie, e2Zombie, isZombieChan := b.isZombieChannel(e1, e2)

if e1Zombie {
Expand Down
76 changes: 76 additions & 0 deletions graph/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,82 @@ func TestPruneChannelGraphStaleEdges(t *testing.T) {
}
}

// TestIsZombieByAge tests that we can properly determine if a channel with no
// edge policies is a zombie or not using the block timestamp that the
// transaction that opened the channel was included in.
//
//nolint:ll
func TestIsZombieByAge(t *testing.T) {
t.Parallel()

tests := []struct {
name string
blockTimestamp time.Time
channelPruneExpiry time.Duration
expectedPrune bool
}{
{
name: "old chan",
blockTimestamp: time.Now().Add(-30 * 24 * time.Hour),
channelPruneExpiry: 14 * 24 * time.Hour,
expectedPrune: true,
},
{
name: "recent channel",
blockTimestamp: time.Now().Add(-7 * 24 * time.Hour),
channelPruneExpiry: 14 * 24 * time.Hour,
expectedPrune: false,
},
{
name: "chan at threshold",
blockTimestamp: time.Now().Add(-14 * 24 * time.Hour),
channelPruneExpiry: 14 * 24 * time.Hour,
expectedPrune: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
// Create mock chain with a starting height.
const startingHeight = 100
mockChain := newMockChain(startingHeight)

// Create a block with the desired timestamp.
block := &wire.MsgBlock{
Header: wire.BlockHeader{
Timestamp: tc.blockTimestamp,
},
}

// Add the block at a specific height.
const channelBlockHeight = 101
mockChain.addBlock(block, channelBlockHeight, 0)

scid := lnwire.ShortChannelID{
BlockHeight: channelBlockHeight,
TxIndex: 0,
TxPosition: 0,
}

// Create a minimal builder config that consist the mock
// chain and the channel prune expiry we set.
cfg := &Config{
Chain: mockChain,
ChannelPruneExpiry: tc.channelPruneExpiry,
}
builder := &Builder{
cfg: cfg,
}

// Test the method to see we are able to determine if
// the channel is a zombie or not.
isZombie, err := builder.IsZombieByAge(scid.ToUint64())
require.NoError(t, err)
require.Equal(t, tc.expectedPrune, isZombie)
})
}
}

// TestPruneChannelGraphDoubleDisabled test that we can properly prune channels
// with both edges disabled from our channel graph.
func TestPruneChannelGraphDoubleDisabled(t *testing.T) {
Expand Down
138 changes: 138 additions & 0 deletions graph/db/graph_sql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//go:build test_db_postgres || test_db_sqlite

package graphdb

import (
"testing"
"time"

"github.com/lightningnetwork/lnd/fn/v2"
"github.com/stretchr/testify/require"
)

// TestChanUpdatesInHorizonWithNoPolicies tests that we're able to properly
// retrieve channels with no policies within the time range.
func TestChanUpdatesInHorizonWithNoPolicies(t *testing.T) {
t.Parallel()
ctx := t.Context()

graph := MakeTestGraph(t)

// We'll start by creating two nodes which will seed our test graph.
node1 := createTestVertex(t)
require.NoError(t, graph.AddNode(ctx, node1))

node2 := createTestVertex(t)
require.NoError(t, graph.AddNode(ctx, node2))

// Note: startTime and endTime only works if the channels have
// policies. If not, the channels are included irrespective of the
// time range.
startTime := time.Unix(1234, 0)
endTime := startTime
edges := make([]ChannelEdge, 0, 10)

// We'll now create 10 channels between the two nodes, with no policies.
const numChans = 10
for i := range numChans {
channel, chanID := createEdge(
uint32(i*10), 0, 0, 0, node1, node2,
)
require.NoError(t, graph.AddChannelEdge(ctx, &channel))

// The first 5 channels will have no policies.
if i < numChans/2 {
edges = append(edges, ChannelEdge{
Info: &channel,
})

continue
}

edge1UpdateTime := endTime
edge2UpdateTime := edge1UpdateTime.Add(time.Second)
endTime = endTime.Add(time.Second * 10)

edge1 := newEdgePolicy(
chanID.ToUint64(), edge1UpdateTime.Unix(),
)
edge1.ChannelFlags = 0
edge1.ToNode = node2.PubKeyBytes
edge1.SigBytes = testSig.Serialize()
require.NoError(t, graph.UpdateEdgePolicy(ctx, edge1))

edge2 := newEdgePolicy(
chanID.ToUint64(), edge2UpdateTime.Unix(),
)
edge2.ChannelFlags = 1
edge2.ToNode = node1.PubKeyBytes
edge2.SigBytes = testSig.Serialize()
require.NoError(t, graph.UpdateEdgePolicy(ctx, edge2))

edges = append(edges, ChannelEdge{
Info: &channel,
Policy1: edge1,
Policy2: edge2,
})
}

// With our channels loaded, we'll now start our series of queries.
queryCases := []struct {
start time.Time
end time.Time
resp []ChannelEdge
}{
// If we query for a time range that's strictly below our set
// of updates, then we'll get only the 5 channels with no
// policies.
{
start: time.Unix(100, 0),
end: time.Unix(200, 0),
resp: edges[:5],
},

// If we query for a time range that's well beyond our set of
// updates, we should get only the 5 channels with no
// policies.
{
start: time.Unix(99999, 0),
end: time.Unix(999999, 0),
resp: edges[:5],
},

// If we query for the start time, and 10 seconds directly
// after it, we should only get the 5 channels with no
// policies and one channel with a policy.
{
start: time.Unix(1234, 0),
end: startTime.Add(time.Second * 10),
resp: edges[:6],
},

// If we use the start and end time as is, we should get the
// entire range.
{
start: startTime,
end: endTime,

resp: edges[:10],
},
}

for _, queryCase := range queryCases {
respIter := graph.ChanUpdatesInHorizon(
queryCase.start, queryCase.end,
)

resp, err := fn.CollectErr(respIter)
require.NoError(t, err)
require.Equal(t, len(resp), len(queryCase.resp))

for i := range len(resp) {
chanExp := queryCase.resp[i]
chanRet := resp[i]

require.Equal(t, chanExp.Info, chanRet.Info)
}
}
}
5 changes: 5 additions & 0 deletions graph/db/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,11 @@ func (s *SQLStore) updateChanCacheBatch(edgesToCache map[uint64]ChannelEdge) {
// 5. Update cache after successful batch
// 6. Repeat with updated pagination cursor until no more results
//
// Note: Ideally each channel should have at least one policy. However, if a
// channel is created and never updated, it will not have any policies.
// In this case, we'll return the channel with no policies at all regardless of
// the time range. This helps us prune zombie channels with no policies.
//
// NOTE: This is part of the V1Store interface.
func (s *SQLStore) ChanUpdatesInHorizon(startTime, endTime time.Time,
opts ...IteratorOption) iter.Seq2[ChannelEdge, error] {
Expand Down
4 changes: 4 additions & 0 deletions sqldb/sqlc/graph.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions sqldb/sqlc/queries/graph.sql
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ WHERE c.version = @version
(cp1.last_update >= @start_time AND cp1.last_update < @end_time)
OR
(cp2.last_update >= @start_time AND cp2.last_update < @end_time)
-- TODO(abdulkbk): see the potential of adding a created_at to channel
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should limit this query since for example couple of nodes will have like >2000 of these cases, should be fine I think just thinking out loud.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it makes sense, especially when this change is run for the first time and the node has many stale channels with no policy. It should be fine for the next ticks (1-hour pruning intervals) tho.

-- table to extend this query to include channels created in a time range.
OR
(cp1.last_update IS NULL AND cp2.last_update IS NULL)
)
-- Pagination using compound cursor (max_update_time, id).
-- We use COALESCE with -1 as sentinel since timestamps are always positive.
Expand Down
Loading