Skip to content

Commit

Permalink
implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
arnaubennassar committed Nov 4, 2024
1 parent 6c12665 commit 5fd36b9
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 122 deletions.
39 changes: 4 additions & 35 deletions reorgdetector/migrations/reorgdetector0001.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,8 @@ DROP TABLE IF EXISTS claim;
DROP TABLE IF EXISTS bridge;

-- +migrate Up
CREATE TABLE block (
num BIGINT PRIMARY KEY
);

CREATE TABLE bridge (
block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE,
block_pos INTEGER NOT NULL,
leaf_type INTEGER NOT NULL,
origin_network INTEGER NOT NULL,
origin_address VARCHAR NOT NULL,
destination_network INTEGER NOT NULL,
destination_address VARCHAR NOT NULL,
amount TEXT NOT NULL,
metadata BLOB,
deposit_count INTEGER NOT NULL,
PRIMARY KEY (block_num, block_pos)
);

CREATE TABLE claim (
block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE,
block_pos INTEGER NOT NULL,
global_index TEXT NOT NULL,
origin_network INTEGER NOT NULL,
origin_address VARCHAR NOT NULL,
destination_address VARCHAR NOT NULL,
amount TEXT NOT NULL,
proof_local_exit_root VARCHAR,
proof_rollup_exit_root VARCHAR,
mainnet_exit_root VARCHAR,
rollup_exit_root VARCHAR,
global_exit_root VARCHAR,
destination_network INTEGER NOT NULL,
metadata BLOB,
is_message BOOLEAN,
PRIMARY KEY (block_num, block_pos)
CREATE TABLE tracked_block (
subscriber_id VARCHAR NOT NULL,
num BIGINT NOT NULL,
hash VARCHAR NOT NULL
);
34 changes: 23 additions & 11 deletions reorgdetector/reorgdetector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/0xPolygon/cdk/db"
"github.com/0xPolygon/cdk/log"
"github.com/0xPolygon/cdk/reorgdetector/migrations"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -36,6 +37,10 @@ type ReorgDetector struct {
}

func New(client EthClient, cfg Config) (*ReorgDetector, error) {
err := migrations.RunMigrations(cfg.DBPath)
if err != nil {
return nil, err
}
db, err := db.NewSQLiteDB(cfg.DBPath)
if err != nil {
return nil, err
Expand All @@ -53,7 +58,7 @@ func New(client EthClient, cfg Config) (*ReorgDetector, error) {
// Start starts the reorg detector
func (rd *ReorgDetector) Start(ctx context.Context) (err error) {
// Load tracked blocks from the DB
if err = rd.loadTrackedHeaders(ctx); err != nil {
if err = rd.loadTrackedHeaders(); err != nil {
return fmt.Errorf("failed to load tracked headers: %w", err)
}

Expand Down Expand Up @@ -93,7 +98,7 @@ func (rd *ReorgDetector) AddBlockToTrack(ctx context.Context, id string, num uin

// Store the given header to the tracked list
hdr := newHeader(num, hash)
if err := rd.saveTrackedBlock(ctx, id, hdr); err != nil {
if err := rd.saveTrackedBlock(id, hdr); err != nil {
return fmt.Errorf("failed to save tracked block: %w", err)
}

Expand Down Expand Up @@ -154,24 +159,31 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
if hdr.Num <= lastFinalisedBlock.Number.Uint64() {
hdrs.removeRange(hdr.Num, hdr.Num)
}
if err := rd.removeTrackedBlockRange(id, hdr.Num, hdr.Num); err != nil {
return fmt.Errorf(
"error removing blocks from DB for subscriber %s between blocks %d and %d: %w",
id, hdr.Num, hdr.Num, err,
)
}

continue
}

// Notify the subscriber about the reorg
rd.notifySubscriber(id, hdr)

// Remove the reorged block and all the following blocks
// Remove the reorged block and all the following blocks from DB
if err := rd.removeTrackedBlockRange(id, hdr.Num, headers[len(headers)-1].Num); err != nil {
return fmt.Errorf(
"error removing blocks from DB for subscriber %s between blocks %d and %d: %w",
id, hdr.Num, headers[len(headers)-1].Num, err,
)
}
// Remove the reorged block and all the following blocks from memory
hdrs.removeRange(hdr.Num, headers[len(headers)-1].Num)

break
}

// Update the tracked blocks in the DB
if err := rd.updateTrackedBlocksDB(ctx, id, hdrs); err != nil {
return fmt.Errorf("failed to update tracked blocks for subscriber %s: %w", id, err)
}

return nil
})
}
Expand All @@ -180,12 +192,12 @@ func (rd *ReorgDetector) detectReorgInTrackedList(ctx context.Context) error {
}

// loadTrackedHeaders loads tracked headers from the DB and stores them in memory
func (rd *ReorgDetector) loadTrackedHeaders(ctx context.Context) (err error) {
func (rd *ReorgDetector) loadTrackedHeaders() (err error) {
rd.trackedBlocksLock.Lock()
defer rd.trackedBlocksLock.Unlock()

// Load tracked blocks for all subscribers from the DB
if rd.trackedBlocks, err = rd.getTrackedBlocks(ctx); err != nil {
if rd.trackedBlocks, err = rd.getTrackedBlocks(); err != nil {
return fmt.Errorf("failed to get tracked blocks: %w", err)
}

Expand Down
114 changes: 41 additions & 73 deletions reorgdetector/reorgdetector_db.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,51 @@
package reorgdetector

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/ledgerwatch/erigon-lib/kv"
"github.com/0xPolygon/cdk/db"
"github.com/russross/meddler"
)

const (
subscriberBlocks = "reorgdetector-subscriberBlocks"
)

func tableCfgFunc(_ kv.TableCfg) kv.TableCfg {
return kv.TableCfg{
subscriberBlocks: {},
}
}

// getTrackedBlocks returns a list of tracked blocks for each subscriber from db
func (rd *ReorgDetector) getTrackedBlocks(ctx context.Context) (map[string]*headersList, error) {
tx, err := rd.db.BeginRo(ctx)
func (rd *ReorgDetector) getTrackedBlocks() (map[string]*headersList, error) {
trackedBlocks := make(map[string]*headersList, 0)
var headersWithID []*headerWithSubscriberID
err := meddler.QueryAll(rd.db, &headersWithID, "SELECT * FROM tracked_block ORDER BY subscriber_id;")
if err != nil {
return nil, err
if errors.Is(err, db.ErrNotFound) {
return trackedBlocks, nil
}
return nil, fmt.Errorf("error queryng tracked_block: %w", err)
}

defer tx.Rollback()

cursor, err := tx.Cursor(subscriberBlocks)
if err != nil {
return nil, err
if len(headersWithID) == 0 {
return trackedBlocks, nil
}

defer cursor.Close()

trackedBlocks := make(map[string]*headersList, 0)

for k, v, err := cursor.First(); k != nil; k, v, err = cursor.Next() {
if err != nil {
return nil, err
currentID := headersWithID[0].SubscriberID
currentHeaders := []header{}
for i := 0; i < len(headersWithID); i++ {
if headersWithID[i].SubscriberID != currentID {
trackedBlocks[currentID] = newHeadersList(currentHeaders...)
currentHeaders = []header{{
Num: headersWithID[i].Num,
Hash: headersWithID[i].Hash,
}}
currentID = headersWithID[i].SubscriberID
} else {
currentHeaders = append(currentHeaders, header{
Num: headersWithID[i].Num,
Hash: headersWithID[i].Hash,
})
}

var headers []header
if err := json.Unmarshal(v, &headers); err != nil {
return nil, err
}

trackedBlocks[string(k)] = newHeadersList(headers...)
}

return trackedBlocks, nil
}

// saveTrackedBlock saves the tracked block for a subscriber in db and in memory
func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b header) error {
func (rd *ReorgDetector) saveTrackedBlock(id string, b header) error {
rd.trackedBlocksLock.Lock()

// this has to go after the lock, because of a possible deadlock
// between AddBlocksToTrack and detectReorgInTrackedList
tx, err := rd.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()

hdrs, ok := rd.trackedBlocks[id]
if !ok || hdrs.isEmpty() {
hdrs = newHeadersList(b)
Expand All @@ -72,32 +54,18 @@ func (rd *ReorgDetector) saveTrackedBlock(ctx context.Context, id string, b head
hdrs.add(b)
}
rd.trackedBlocksLock.Unlock()

raw, err := json.Marshal(hdrs.getSorted())
if err != nil {
return err
}

return tx.Put(subscriberBlocks, []byte(id), raw)
return meddler.Insert(rd.db, "tracked_block", &headerWithSubscriberID{
SubscriberID: id,
Num: b.Num,
Hash: b.Hash,
})
}

// updateTrackedBlocksDB updates the tracked blocks for a subscriber in db
func (rd *ReorgDetector) updateTrackedBlocksDB(ctx context.Context, id string, blocks *headersList) error {
tx, err := rd.db.BeginRw(ctx)
if err != nil {
return err
}

defer tx.Rollback()

raw, err := json.Marshal(blocks.getSorted())
if err != nil {
return err
}

if err = tx.Put(subscriberBlocks, []byte(id), raw); err != nil {
return err
}

return nil
func (rd *ReorgDetector) removeTrackedBlockRange(id string, fromBlock, toBlock uint64) error {
_, err := rd.db.Exec(
"DELETE FROM tracked_block WHERE num >= $1 AND NUM <= 2 AND subscriber_id = $3;",
fromBlock, toBlock, id,
)
return err
}
3 changes: 2 additions & 1 deletion reorgdetector/reorgdetector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reorgdetector

import (
"context"
"path"
"testing"
"time"

Expand All @@ -19,7 +20,7 @@ func Test_ReorgDetector(t *testing.T) {
clientL1, _ := helpers.SimulatedBackend(t, nil, 0)

// Create test DB dir
testDir := t.TempDir()
testDir := path.Join(t.TempDir(), "file::memory:?cache=shared")

reorgDetector, err := New(clientL1.Client(), Config{DBPath: testDir, CheckReorgsInterval: cdktypes.NewDuration(time.Millisecond * 100)})
require.NoError(t, err)
Expand Down
10 changes: 8 additions & 2 deletions reorgdetector/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@ import (
)

type header struct {
Num uint64
Hash common.Hash
Num uint64 `meddler:"num"`
Hash common.Hash `meddler:"hash,hash"`
}

type headerWithSubscriberID struct {
SubscriberID string `meddler:"subscriber_id"`
Num uint64 `meddler:"num"`
Hash common.Hash `meddler:"hash,hash"`
}

// newHeader returns a new instance of header
Expand Down

0 comments on commit 5fd36b9

Please sign in to comment.