Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions graph/db/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,63 @@ func TestSourceNode(t *testing.T) {
compareNodes(t, testNode, sourceNode)
}

// TestSetSourceNodeSameTimestamp tests that SetSourceNode accepts updates
// with the same timestamp. This is necessary because multiple code paths
// (setSelfNode, createNewHiddenService, RPC updates) can race during startup,
// reading the same old timestamp and independently incrementing it to the same
// new value. For our own node, we want parameter changes to persist even with
// timestamp collisions (unlike network gossip where same timestamp means same
// content).
func TestSetSourceNodeSameTimestamp(t *testing.T) {
t.Parallel()
ctx := t.Context()

graph := MakeTestGraph(t)

// Create and set the initial source node.
testNode := createTestVertex(t)
require.NoError(t, graph.SetSourceNode(ctx, testNode))

// Verify the source node was set correctly.
sourceNode, err := graph.SourceNode(ctx)
require.NoError(t, err)
compareNodes(t, testNode, sourceNode)

// Create a modified version of the node with the same timestamp but
// different parameters (e.g., different alias and color). This
// simulates the race condition where multiple goroutines read the
// same old timestamp, independently increment it, and try to update
// with different changes.
modifiedNode := models.NewV1Node(
testNode.PubKeyBytes, &models.NodeV1Fields{
// Same timestamp.
LastUpdate: testNode.LastUpdate,
// Different alias.
Alias: "different-alias",
Color: color.RGBA{R: 100, G: 200, B: 50, A: 0},
Addresses: testNode.Addresses,
Features: testNode.Features.RawFeatureVector,
AuthSigBytes: testNode.AuthSigBytes,
},
)

// Attempt to set the source node with the same timestamp but
// different parameters. This should now succeed for both SQL and KV
// stores. The SQL store uses UpsertSourceNode which removes the
// strict timestamp constraint, allowing last-write-wins semantics.
require.NoError(t, graph.SetSourceNode(ctx, modifiedNode))

// Verify that the parameter changes actually persisted.
updatedNode, err := graph.SourceNode(ctx)
require.NoError(t, err)
require.Equal(t, "different-alias", updatedNode.Alias.UnwrapOr(""))
require.Equal(
t, color.RGBA{R: 100, G: 200, B: 50, A: 0},
updatedNode.Color.UnwrapOr(color.RGBA{}),
)
require.Equal(t, testNode.LastUpdate, updatedNode.LastUpdate)
}

// TestEdgeInsertionDeletion tests the basic CRUD operations for channel edges.
func TestEdgeInsertionDeletion(t *testing.T) {
t.Parallel()
Expand Down
192 changes: 152 additions & 40 deletions graph/db/sql_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type SQLQueries interface {
Node queries.
*/
UpsertNode(ctx context.Context, arg sqlc.UpsertNodeParams) (int64, error)
UpsertSourceNode(ctx context.Context, arg sqlc.UpsertSourceNodeParams) (int64, error)
GetNodeByPubKey(ctx context.Context, arg sqlc.GetNodeByPubKeyParams) (sqlc.GraphNode, error)
GetNodesByIDs(ctx context.Context, ids []int64) ([]sqlc.GraphNode, error)
GetNodeIDByPubKey(ctx context.Context, arg sqlc.GetNodeIDByPubKeyParams) (int64, error)
Expand Down Expand Up @@ -521,7 +522,14 @@ func (s *SQLStore) SetSourceNode(ctx context.Context,
node *models.Node) error {

return s.db.ExecTx(ctx, sqldb.WriteTxOpt(), func(db SQLQueries) error {
id, err := upsertNode(ctx, db, node)
// For the source node, we use a less strict upsert that allows
// updates even when the timestamp hasn't changed. This handles
// the race condition where multiple goroutines (e.g.,
// setSelfNode, createNewHiddenService, RPC updates) read the
// same old timestamp, independently increment it, and try to
// write concurrently. We want all parameter changes to persist,
// even if timestamps collide.
id, err := upsertSourceNode(ctx, db, node)
if err != nil {
return fmt.Errorf("unable to upsert source node: %w",
err)
Expand Down Expand Up @@ -3599,77 +3607,181 @@ func getNodeFeatures(ctx context.Context, db SQLQueries,
return features, nil
}

// upsertNode upserts the node record into the database. If the node already
// exists, then the node's information is updated. If the node doesn't exist,
// then a new node is created. The node's features, addresses and extra TLV
// types are also updated. The node's DB ID is returned.
func upsertNode(ctx context.Context, db SQLQueries,
node *models.Node) (int64, error) {
// upsertNodeAncillaryData updates the node's features, addresses, and extra
// signed fields. This is common logic shared by upsertNode and
// upsertSourceNode.
func upsertNodeAncillaryData(ctx context.Context, db SQLQueries,
nodeID int64, node *models.Node) error {

params := sqlc.UpsertNodeParams{
Version: int16(lnwire.GossipVersion1),
PubKey: node.PubKeyBytes[:],
// Update the node's features.
err := upsertNodeFeatures(ctx, db, nodeID, node.Features)
if err != nil {
return fmt.Errorf("inserting node features: %w", err)
}

if node.HaveAnnouncement() {
switch node.Version {
case lnwire.GossipVersion1:
params.LastUpdate = sqldb.SQLInt64(
node.LastUpdate.Unix(),
)
// Update the node's addresses.
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
if err != nil {
return fmt.Errorf("inserting node addresses: %w", err)
}

case lnwire.GossipVersion2:
// Convert the flat extra opaque data into a map of TLV types to
// values.
extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
if err != nil {
return fmt.Errorf("unable to marshal extra opaque data: %w",
err)
}

default:
return 0, fmt.Errorf("unknown gossip version: %d",
node.Version)
}
// Update the node's extra signed fields.
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
if err != nil {
return fmt.Errorf("inserting node extra TLVs: %w", err)
}

return nil
}

// populateNodeParams populates the common node parameters from a models.Node.
// This is a helper for building UpsertNodeParams and UpsertSourceNodeParams.
func populateNodeParams(node *models.Node,
setParams func(lastUpdate sql.NullInt64, alias,
colorStr sql.NullString, signature []byte)) error {

if !node.HaveAnnouncement() {
return nil
}

switch node.Version {
case lnwire.GossipVersion1:
lastUpdate := sqldb.SQLInt64(node.LastUpdate.Unix())
var alias, colorStr sql.NullString

node.Color.WhenSome(func(rgba color.RGBA) {
params.Color = sqldb.SQLStrValid(EncodeHexColor(rgba))
colorStr = sqldb.SQLStrValid(EncodeHexColor(rgba))
})
node.Alias.WhenSome(func(s string) {
params.Alias = sqldb.SQLStrValid(s)
alias = sqldb.SQLStrValid(s)
})

params.Signature = node.AuthSigBytes
setParams(lastUpdate, alias, colorStr, node.AuthSigBytes)

case lnwire.GossipVersion2:
// No-op for now.

default:
return fmt.Errorf("unknown gossip version: %d", node.Version)
}

nodeID, err := db.UpsertNode(ctx, params)
return nil
}

// buildNodeUpsertParams builds the parameters for upserting a node using the
// strict UpsertNode query (requires timestamp to be increasing).
func buildNodeUpsertParams(node *models.Node) (sqlc.UpsertNodeParams, error) {
params := sqlc.UpsertNodeParams{
Version: int16(lnwire.GossipVersion1),
PubKey: node.PubKeyBytes[:],
}

err := populateNodeParams(
node, func(lastUpdate sql.NullInt64, alias,
colorStr sql.NullString,
signature []byte) {

params.LastUpdate = lastUpdate
params.Alias = alias
params.Color = colorStr
params.Signature = signature
})

return params, err
}

// buildSourceNodeUpsertParams builds the parameters for upserting the source
// node using the lenient UpsertSourceNode query (allows same timestamp).
func buildSourceNodeUpsertParams(node *models.Node) (
sqlc.UpsertSourceNodeParams, error) {

params := sqlc.UpsertSourceNodeParams{
Version: int16(lnwire.GossipVersion1),
PubKey: node.PubKeyBytes[:],
}

err := populateNodeParams(
node, func(lastUpdate sql.NullInt64, alias,
colorStr sql.NullString, signature []byte) {

params.LastUpdate = lastUpdate
params.Alias = alias
params.Color = colorStr
params.Signature = signature
},
)

return params, err
}

// upsertSourceNode upserts the source node record into the database using a
// less strict upsert that allows updates even when the timestamp hasn't
// changed. This is necessary to handle concurrent updates to our own node
// during startup and runtime. The node's features, addresses and extra TLV
// types are also updated. The node's DB ID is returned.
func upsertSourceNode(ctx context.Context, db SQLQueries,
node *models.Node) (int64, error) {

params, err := buildSourceNodeUpsertParams(node)
if err != nil {
return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
err)
return 0, err
}

nodeID, err := db.UpsertSourceNode(ctx, params)
if err != nil {
return 0, fmt.Errorf("upserting source node(%x): %w",
node.PubKeyBytes, err)
}

// We can exit here if we don't have the announcement yet.
if !node.HaveAnnouncement() {
return nodeID, nil
}

// Update the node's features.
err = upsertNodeFeatures(ctx, db, nodeID, node.Features)
// Update the ancillary node data (features, addresses, extra fields).
err = upsertNodeAncillaryData(ctx, db, nodeID, node)
if err != nil {
return 0, fmt.Errorf("inserting node features: %w", err)
return 0, err
}

// Update the node's addresses.
err = upsertNodeAddresses(ctx, db, nodeID, node.Addresses)
return nodeID, nil
}

// upsertNode upserts the node record into the database. If the node already
// exists, then the node's information is updated. If the node doesn't exist,
// then a new node is created. The node's features, addresses and extra TLV
// types are also updated. The node's DB ID is returned.
func upsertNode(ctx context.Context, db SQLQueries,
node *models.Node) (int64, error) {

params, err := buildNodeUpsertParams(node)
if err != nil {
return 0, fmt.Errorf("inserting node addresses: %w", err)
return 0, err
}

// Convert the flat extra opaque data into a map of TLV types to
// values.
extra, err := marshalExtraOpaqueData(node.ExtraOpaqueData)
nodeID, err := db.UpsertNode(ctx, params)
if err != nil {
return 0, fmt.Errorf("unable to marshal extra opaque data: %w",
return 0, fmt.Errorf("upserting node(%x): %w", node.PubKeyBytes,
err)
}

// Update the node's extra signed fields.
err = upsertNodeExtraSignedFields(ctx, db, nodeID, extra)
// We can exit here if we don't have the announcement yet.
if !node.HaveAnnouncement() {
return nodeID, nil
}

// Update the ancillary node data (features, addresses, extra fields).
err = upsertNodeAncillaryData(ctx, db, nodeID, node)
if err != nil {
return 0, fmt.Errorf("inserting node extra TLVs: %w", err)
return 0, err
}

return nodeID, nil
Expand Down
43 changes: 43 additions & 0 deletions sqldb/sqlc/graph.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions sqldb/sqlc/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions sqldb/sqlc/queries/graph.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ WHERE graph_nodes.last_update IS NULL
OR EXCLUDED.last_update > graph_nodes.last_update
RETURNING id;

-- We use a separate upsert for our own node since we want to be less strict
-- about the last_update field. For our own node, we always want to
-- update the record even if the last_update is older than what we have.
Comment on lines +24 to +26

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

According to the style guide, function comments should start with the function name and be a complete sentence.1 This comment is used to generate the Go docstring for UpsertSourceNode, which currently violates this rule. Please update the comment to adhere to the style guide.

-- UpsertSourceNode uses a separate upsert for our own node since we want to be
-- less strict about the last_update field. For our own node, we always want to
-- update the record even if the last_update is older than what we have.

Style Guide References

Footnotes

-- name: UpsertSourceNode :one
INSERT INTO graph_nodes (
version, pub_key, alias, last_update, color, signature
) VALUES (
$1, $2, $3, $4, $5, $6
)
ON CONFLICT (pub_key, version)
-- Update the following fields if a conflict occurs on pub_key
-- and version.
DO UPDATE SET
alias = EXCLUDED.alias,
last_update = EXCLUDED.last_update,
color = EXCLUDED.color,
signature = EXCLUDED.signature
RETURNING id;

-- name: GetNodesByIDs :many
SELECT *
FROM graph_nodes
Expand Down
Loading