diff --git a/graph/db/graph_sql_test.go b/graph/db/graph_sql_test.go new file mode 100644 index 0000000000..31c2d7625e --- /dev/null +++ b/graph/db/graph_sql_test.go @@ -0,0 +1,75 @@ +//go:build test_db_postgres || test_db_sqlite + +package graphdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestNodeIsPublicCacheInvalidation ensures that we invalidate correctly our +// cache we use when determing if a node is public or not. +func TestNodeIsPublicCacheInvalidation(t *testing.T) { + t.Parallel() + ctx := t.Context() + + graph := MakeTestGraph(t) + + node1 := createTestVertex(t) + node2 := createTestVertex(t) + + require.NoError(t, graph.AddNode(ctx, node1)) + require.NoError(t, graph.AddNode(ctx, node2)) + + edge, _ := createEdge(10, 0, 0, 0, node1, node2) + require.NoError(t, graph.AddChannelEdge(ctx, &edge)) + + // First IsPublic call should populate cache. + isPublic1, err := graph.IsPublicNode(node1.PubKeyBytes) + require.NoError(t, err) + require.True(t, isPublic1) + + // Test invalidation scenarios: + + // 1. DeleteChannelEdges: + // Above, the channel being public should be cached, but we expect that + // DeleteChannelEdge will invalidate the cache for both nodes else when + // we call IsPublic, we will hit the cache. + err = graph.DeleteChannelEdges(false, true, edge.ChannelID) + require.NoError(t, err) + isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes) + require.NoError(t, err) + require.False(t, isPublic1) + + isPublic2, err := graph.IsPublicNode(node2.PubKeyBytes) + require.NoError(t, err) + require.False(t, isPublic2) + + // 2. AddChannelEdge: + // Now we know that the last `IsPublicNode` call above will cache our + // nodes with `isPublic` = false. But add a new channel edge should + // invalidate the cache such that when we call `IsPublic` it should + // return `True`. + edge2, _ := createEdge(10, 1, 0, 1, node1, node2) + require.NoError(t, graph.AddChannelEdge(ctx, &edge2)) + isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes) + require.NoError(t, err) + require.True(t, isPublic1) + + isPublic2, err = graph.IsPublicNode(node2.PubKeyBytes) + require.NoError(t, err) + require.True(t, isPublic2) + + // 3. DeleteNode: + // Again, the last two sets of `IsPublic` should have cached our nodes + // as `True`. Now we can delete a node and expect the next call to be + // False. + // + // NOTE: We don't get an error calling `IsPublicNode` because of how the + // SQL query is implemented to check for the existence of public nodes. + require.NoError(t, graph.DeleteNode(ctx, node1.PubKeyBytes)) + isPublic1, err = graph.IsPublicNode(node1.PubKeyBytes) + require.NoError(t, err) + require.False(t, isPublic1) +} diff --git a/graph/db/options.go b/graph/db/options.go index 15ea6f4ee8..9d1193fd14 100644 --- a/graph/db/options.go +++ b/graph/db/options.go @@ -13,6 +13,11 @@ const ( // around 40MB. DefaultChannelCacheSize = 20000 + // DefaultPublicNodeCacheSize is the default number of node public + // status entries to cache. With 15k nodes, this produces a cache of + // around 500KB. + DefaultPublicNodeCacheSize = 15000 + // DefaultPreAllocCacheNumNodes is the default number of channels we // assume for mainnet for pre-allocating the graph cache. As of // September 2021, there currently are 14k nodes in a strictly pruned @@ -125,6 +130,10 @@ type StoreOptions struct { // channel cache. ChannelCacheSize int + // PublicNodeCacheSize is the maximum number of node public status + // entries to hold in the cache. + PublicNodeCacheSize int + // BatchCommitInterval is the maximum duration the batch schedulers will // wait before attempting to commit a pending set of updates. BatchCommitInterval time.Duration @@ -138,9 +147,10 @@ type StoreOptions struct { // DefaultOptions returns a StoreOptions populated with default values. func DefaultOptions() *StoreOptions { return &StoreOptions{ - RejectCacheSize: DefaultRejectCacheSize, - ChannelCacheSize: DefaultChannelCacheSize, - NoMigration: false, + RejectCacheSize: DefaultRejectCacheSize, + ChannelCacheSize: DefaultChannelCacheSize, + PublicNodeCacheSize: DefaultPublicNodeCacheSize, + NoMigration: false, } } @@ -169,3 +179,5 @@ func WithBatchCommitInterval(interval time.Duration) StoreOptionModifier { o.BatchCommitInterval = interval } } + +// Todo(abdulkbk) consider adding WithPublicNodeCacheSize. diff --git a/graph/db/sql_store.go b/graph/db/sql_store.go index f67894e4ce..e3e396cd46 100644 --- a/graph/db/sql_store.go +++ b/graph/db/sql_store.go @@ -20,6 +20,8 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/neutrino/cache" + "github.com/lightninglabs/neutrino/cache/lru" "github.com/lightningnetwork/lnd/aliasmgr" "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/fn/v2" @@ -181,12 +183,14 @@ type SQLStore struct { cfg *SQLStoreConfig db BatchedSQLQueries - // cacheMu guards all caches (rejectCache and chanCache). If - // this mutex will be acquired at the same time as the DB mutex then - // the cacheMu MUST be acquired first to prevent deadlock. - cacheMu sync.RWMutex - rejectCache *rejectCache - chanCache *channelCache + // cacheMu guards all caches (rejectCache, chanCache, and + // publicNodeCache). If this mutex will be acquired at the same time as + // the DB mutex then the cacheMu MUST be acquired first to prevent + // deadlock. + cacheMu sync.RWMutex + rejectCache *rejectCache + chanCache *channelCache + publicNodeCache *lru.Cache[[33]byte, *cachedPublicNode] chanScheduler batch.Scheduler[SQLQueries] nodeScheduler batch.Scheduler[SQLQueries] @@ -195,6 +199,18 @@ type SQLStore struct { srcNodeMu sync.Mutex } +// cachedPublicNode is a simple wrapper for a boolean value that can be +// stored in an LRU cache. The LRU cache requires a Size() method. +type cachedPublicNode struct { + isPublic bool +} + +// Size returns the size of the cache entry. We return 1 as we just want to +// limit the number of entries rather than their actual memory size. +func (c *cachedPublicNode) Size() (uint64, error) { + return 1, nil +} + // A compile-time assertion to ensure that SQLStore implements the V1Store // interface. var _ V1Store = (*SQLStore)(nil) @@ -229,7 +245,10 @@ func NewSQLStore(cfg *SQLStoreConfig, db BatchedSQLQueries, db: db, rejectCache: newRejectCache(opts.RejectCacheSize), chanCache: newChannelCache(opts.ChannelCacheSize), - srcNodes: make(map[ProtocolVersion]*srcNodeInfo), + publicNodeCache: lru.NewCache[[33]byte, *cachedPublicNode]( + uint64(opts.PublicNodeCacheSize), + ), + srcNodes: make(map[ProtocolVersion]*srcNodeInfo), } s.chanScheduler = batch.NewTimeScheduler( @@ -416,6 +435,10 @@ func (s *SQLStore) DeleteNode(ctx context.Context, return fmt.Errorf("unable to delete node: %w", err) } + s.cacheMu.Lock() + s.removePublicNodeCache(pubKey) + s.cacheMu.Unlock() + return nil } @@ -715,6 +738,10 @@ func (s *SQLStore) AddChannelEdge(ctx context.Context, default: s.rejectCache.remove(edge.ChannelID) s.chanCache.remove(edge.ChannelID) + s.removePublicNodeCache( + edge.NodeKey1Bytes, edge.NodeKey2Bytes, + ) + return nil } }, @@ -1721,6 +1748,7 @@ func (s *SQLStore) MarkEdgeZombie(chanID uint64, s.rejectCache.remove(chanID) s.chanCache.remove(chanID) + s.removePublicNodeCache(pubKey1, pubKey2) return nil } @@ -1946,6 +1974,14 @@ func (s *SQLStore) DeleteChannelEdges(strictZombiePruning, markZombie bool, s.chanCache.remove(chanID) } + var pubkeys [][33]byte + for _, edge := range edges { + pubkeys = append( + pubkeys, edge.NodeKey1Bytes, edge.NodeKey2Bytes, + ) + } + s.removePublicNodeCache(pubkeys...) + return edges, nil } @@ -2281,8 +2317,28 @@ func (s *SQLStore) ChannelID(chanPoint *wire.OutPoint) (uint64, error) { func (s *SQLStore) IsPublicNode(pubKey [33]byte) (bool, error) { ctx := context.TODO() + // Check the cache first with a read lock. + s.cacheMu.RLock() + cached, err := s.publicNodeCache.Get(pubKey) + + switch { + case errors.Is(err, cache.ErrElementNotFound): + // Cache not found, so we'll need to fetch the node from the + // database. + + case cached != nil: + s.cacheMu.RUnlock() + return cached.isPublic, nil + + case err != nil: + s.cacheMu.RUnlock() + log.Warnf("unable to check cache if node is public: %w", err) + } + + s.cacheMu.RUnlock() + var isPublic bool - err := s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { + err = s.db.ExecTx(ctx, sqldb.ReadTxOpt(), func(db SQLQueries) error { var err error isPublic, err = db.IsPublicV1Node(ctx, pubKey[:]) @@ -2293,6 +2349,17 @@ func (s *SQLStore) IsPublicNode(pubKey [33]byte) (bool, error) { "public: %w", err) } + // Store the result in cache. + s.cacheMu.Lock() + _, err = s.publicNodeCache.Put(pubKey, &cachedPublicNode{ + isPublic: isPublic, + }) + if err != nil { + log.Warnf("unable to store node info in cache: %w", err) + } + + s.cacheMu.Unlock() + return isPublic, nil } @@ -2644,6 +2711,9 @@ func (s *SQLStore) PruneGraph(spentOutputs []*wire.OutPoint, for _, channel := range closedChans { s.rejectCache.remove(channel.ChannelID) s.chanCache.remove(channel.ChannelID) + s.removePublicNodeCache( + channel.NodeKey1Bytes, channel.NodeKey2Bytes, + ) } return closedChans, prunedNodes, nil @@ -2908,9 +2978,15 @@ func (s *SQLStore) DisconnectBlockAtHeight(height uint32) ( "height: %w", err) } + s.cacheMu.Lock() + defer s.cacheMu.Unlock() + for _, channel := range removedChans { s.rejectCache.remove(channel.ChannelID) s.chanCache.remove(channel.ChannelID) + s.removePublicNodeCache( + channel.NodeKey1Bytes, channel.NodeKey2Bytes, + ) } return removedChans, nil @@ -5733,3 +5809,13 @@ func handleZombieMarking(ctx context.Context, db SQLQueries, }, ) } + +// removePublicNodeCache takes in a list of public keys and removes the +// corresponding nodes info from the cache if it exists. +// +// NOTE: This method must be called with cacheMu held. +func (s *SQLStore) removePublicNodeCache(pubkeys ...[33]byte) { + for _, pubkey := range pubkeys { + s.publicNodeCache.Delete(pubkey) + } +}