Skip to content

Commit

Permalink
feat: track certificates settlement
Browse files Browse the repository at this point in the history
  • Loading branch information
goran-ethernal committed Oct 7, 2024
1 parent 435fdf8 commit 8c36c4b
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 9 deletions.
45 changes: 40 additions & 5 deletions aggsender/aggsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
aggsendertypes "github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/bridgesync"
cdkcommon "github.com/0xPolygon/cdk/common"
"github.com/0xPolygon/cdk/config/types"
"github.com/0xPolygon/cdk/etherman"
"github.com/0xPolygon/cdk/l1infotreesync"
"github.com/0xPolygon/cdk/log"
Expand Down Expand Up @@ -48,7 +47,7 @@ type AggSender struct {
storage db.AggSenderStorage
aggLayerClient agglayer.AgglayerClientInterface

sendInterval types.Duration
cfg Config

sequencerKey *ecdsa.PrivateKey
}
Expand All @@ -73,25 +72,26 @@ func New(
}

return &AggSender{
cfg: cfg,
log: logger,
storage: storage,
l2Syncer: l2Syncer,
l2Client: l2Client,
aggLayerClient: aggLayerClient,
l1infoTreeSyncer: l1InfoTreeSyncer,
sequencerKey: sequencerPrivateKey,
sendInterval: cfg.CertificateSendInterval,
}, nil
}

// Start starts the AggSender
func (a *AggSender) Start(ctx context.Context) {
a.sendCertificates(ctx)
go a.sendCertificates(ctx)
go a.checkIfCertificatesAreSettled(ctx)
}

// sendCertificates sends certificates to the aggLayer
func (a *AggSender) sendCertificates(ctx context.Context) {
ticker := time.NewTicker(a.sendInterval.Duration)
ticker := time.NewTicker(a.cfg.CertificateSendInterval.Duration)

for {
select {
Expand Down Expand Up @@ -408,3 +408,38 @@ func (a *AggSender) signCertificate(certificate *agglayer.Certificate) (*agglaye
Signature: sig,
}, nil
}

// checkIfCertificatesAreSettled checks if certificates are settled
func (a *AggSender) checkIfCertificatesAreSettled(ctx context.Context) {
ticker := time.NewTicker(a.cfg.CertificateSendInterval.Duration)
for {
select {
case <-ticker.C:
pendingCertificates, err := a.storage.GetCertificatesByStatus(ctx, []agglayer.CertificateStatus{agglayer.Pending})
if err != nil {
a.log.Error("error getting pending certificates: %w", err)
continue
}

for _, certificate := range pendingCertificates {
certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificate.CertificateID)
if err != nil {
a.log.Error("error getting header of certificate %s with height: %d from agglayer: %w",
certificate.CertificateID, certificate.Height, err)
continue
}

if certificateHeader.Status == agglayer.Settled || certificateHeader.Status == agglayer.InError {
certificate.Status = certificateHeader.Status

if err := a.storage.UpdateCertificateStatus(ctx, *certificate); err != nil {
a.log.Error("error updating certificate status in storage: %w", err)
continue
}
}
}
case <-ctx.Done():
return
}
}
}
1 change: 1 addition & 0 deletions aggsender/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type Config struct {
DBPath string `mapstructure:"DBPath"`
AggLayerURL string `mapstructure:"AggLayerURL"`
CertificateSendInterval types.Duration `mapstructure:"CertificateSendInterval"`
CheckSettledInterval types.Duration `mapstructure:"CheckSettledInterval"`
SequencerPrivateKey types.KeystoreFileConfig `mapstructure:"SequencerPrivateKey"`
URLRPCL2 string `mapstructure:"URLRPCL2"`
}
77 changes: 75 additions & 2 deletions aggsender/db/aggsender_db_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"math"
"strings"

"github.com/0xPolygon/cdk/agglayer"
"github.com/0xPolygon/cdk/aggsender/db/migrations"
"github.com/0xPolygon/cdk/aggsender/types"
"github.com/0xPolygon/cdk/db"
Expand All @@ -25,6 +27,10 @@ type AggSenderStorage interface {
SaveLastSentCertificate(ctx context.Context, certificate types.CertificateInfo) error
// DeleteCertificate deletes a certificate from the storage
DeleteCertificate(ctx context.Context, certificateID common.Hash) error
// GetCertificatesByStatus returns a list of certificates by their status
GetCertificatesByStatus(ctx context.Context, status []agglayer.CertificateStatus) ([]*types.CertificateInfo, error)
// UpdateCertificateStatus updates the status of a certificate
UpdateCertificateStatus(ctx context.Context, certificate types.CertificateInfo) error
}

var _ AggSenderStorage = (*AggSenderSQLStorage)(nil)
Expand Down Expand Up @@ -52,6 +58,45 @@ func NewAggSenderSQLStorage(logger *log.Logger, dbPath string) (*AggSenderSQLSto
}, nil
}

func (a *AggSenderSQLStorage) GetCertificatesByStatus(ctx context.Context,
statuses []agglayer.CertificateStatus) ([]*types.CertificateInfo, error) {
tx, err := a.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}

defer func() {
if err := tx.Rollback(); err != nil {
a.logger.Warnf("error rolling back tx: %w", err)
}
}()

query := "SELECT * FROM certificate_info"
args := make([]interface{}, len(statuses))

if len(statuses) > 0 {
placeholders := make([]string, len(statuses))
// Build the WHERE clause for status filtering
for i := range statuses {
placeholders[i] = fmt.Sprintf("$%d", i+1)
args[i] = statuses[i]
}

// Build the WHERE clause with the joined placeholders
query += " WHERE status IN (" + strings.Join(placeholders, ", ") + ")"
}

// Add ordering by creation date (oldest first)
query += " ORDER BY height ASC"

var certificates []*types.CertificateInfo
if err = meddler.QueryAll(a.db, &certificates, query, args...); err != nil {
return nil, err
}

return certificates, nil
}

// GetCertificateByHeight returns a certificate by its height
func (a *AggSenderSQLStorage) GetCertificateByHeight(ctx context.Context,
height uint64) (types.CertificateInfo, error) {
Expand Down Expand Up @@ -79,7 +124,7 @@ func (a *AggSenderSQLStorage) GetCertificateByHeight(ctx context.Context,
return certificateInfo, nil
}

// GetLastSentCertificate returns the last certificate sent to the aggLayer
// GetLastSentCertificate returns the last certificate sent to the aggLayer that is still Pending
func (a *AggSenderSQLStorage) GetLastSentCertificate(ctx context.Context) (types.CertificateInfo, error) {
tx, err := a.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
Expand All @@ -92,7 +137,8 @@ func (a *AggSenderSQLStorage) GetLastSentCertificate(ctx context.Context) (types
}
}()

rows, err := tx.Query(`SELECT * FROM certificate_info ORDER BY height DESC LIMIT 1;`)
rows, err := tx.Query(`SELECT * FROM certificate_info WHERE status = $1 ORDER BY height DESC LIMIT 1;`,
agglayer.Pending)
if err != nil {
return types.CertificateInfo{}, getSelectQueryError(math.MaxUint64, err) // force checking err not found
}
Expand Down Expand Up @@ -157,6 +203,33 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate
return nil
}

// UpdateCertificateStatus updates the status of a certificate
func (a *AggSenderSQLStorage) UpdateCertificateStatus(ctx context.Context, certificate types.CertificateInfo) error {
tx, err := db.NewTx(ctx, a.db)
if err != nil {
return err
}
defer func() {
if err != nil {
if errRllbck := tx.Rollback(); errRllbck != nil {
a.logger.Errorf("error while rolling back tx %w", errRllbck)
}
}
}()

if _, err := tx.Exec(`UPDATE certificate_info SET status = $1 WHERE certificate_id = $2;`,
certificate.Status, certificate.CertificateID); err != nil {
return fmt.Errorf("error updating certificate info: %w", err)
}
if err := tx.Commit(); err != nil {
return err
}

a.logger.Debugf("updated certificate status - CertificateID: %s", certificate.CertificateID)

return nil
}

// clean deletes all the data from the storage
// NOTE: Used only in tests
func (a *AggSenderSQLStorage) clean() error {
Expand Down
66 changes: 65 additions & 1 deletion aggsender/db/aggsender_db_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func Test_Storage(t *testing.T) {
NewLocalExitRoot: common.HexToHash("0x6"),
FromBlock: 5,
ToBlock: 6,
Status: agglayer.Settled,
Status: agglayer.Pending,
}
require.NoError(t, storage.SaveLastSentCertificate(ctx, certificate))

Expand Down Expand Up @@ -113,4 +113,68 @@ func Test_Storage(t *testing.T) {
require.Equal(t, certificate, certificateFromDB)
require.NoError(t, storage.clean())
})

t.Run("GetCertificatesByStatus", func(t *testing.T) {
// Insert some certificates with different statuses
certificates := []*types.CertificateInfo{
{
Height: 7,
CertificateID: common.HexToHash("0x7"),
NewLocalExitRoot: common.HexToHash("0x8"),
FromBlock: 7,
ToBlock: 8,
Status: agglayer.Settled,
},
{
Height: 9,
CertificateID: common.HexToHash("0x9"),
NewLocalExitRoot: common.HexToHash("0xA"),
FromBlock: 9,
ToBlock: 10,
Status: agglayer.Pending,
},
{
Height: 11,
CertificateID: common.HexToHash("0xB"),
NewLocalExitRoot: common.HexToHash("0xC"),
FromBlock: 11,
ToBlock: 12,
Status: agglayer.InError,
},
}

for _, cert := range certificates {
require.NoError(t, storage.SaveLastSentCertificate(ctx, *cert))
}

// Test fetching certificates with status Settled
statuses := []agglayer.CertificateStatus{agglayer.Settled}
certificatesFromDB, err := storage.GetCertificatesByStatus(ctx, statuses)
require.NoError(t, err)
require.Len(t, certificatesFromDB, 1)
require.ElementsMatch(t, []*types.CertificateInfo{certificates[0]}, certificatesFromDB)

// Test fetching certificates with status Pending
statuses = []agglayer.CertificateStatus{agglayer.Pending}
certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses)
require.NoError(t, err)
require.Len(t, certificatesFromDB, 1)
require.ElementsMatch(t, []*types.CertificateInfo{certificates[1]}, certificatesFromDB)

// Test fetching certificates with status InError
statuses = []agglayer.CertificateStatus{agglayer.InError}
certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses)
require.NoError(t, err)
require.Len(t, certificatesFromDB, 1)
require.ElementsMatch(t, []*types.CertificateInfo{certificates[2]}, certificatesFromDB)

// Test fetching certificates with status InError and Pending
statuses = []agglayer.CertificateStatus{agglayer.InError, agglayer.Pending}
certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses)
require.NoError(t, err)
require.Len(t, certificatesFromDB, 2)
require.ElementsMatch(t, []*types.CertificateInfo{certificates[1], certificates[2]}, certificatesFromDB)

require.NoError(t, storage.clean())
})
}
2 changes: 1 addition & 1 deletion config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,5 +276,5 @@ AggLayerURL = "http://zkevm-agglayer"
SequencerPrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"}
CertificateSendInterval = "1m"
URLRPCL2="http://test-aggoracle-l2:8545"
CheckSettledInterval = "5s"
`

0 comments on commit 8c36c4b

Please sign in to comment.