diff --git a/aggsender/aggsender.go b/aggsender/aggsender.go index e7a25c8b..240ff56f 100644 --- a/aggsender/aggsender.go +++ b/aggsender/aggsender.go @@ -11,7 +11,6 @@ import ( aggsendertypes "github.com/0xPolygon/cdk/aggsender/types" "github.com/0xPolygon/cdk/bridgesync" cdkcommon "github.com/0xPolygon/cdk/common" - "github.com/0xPolygon/cdk/config/types" "github.com/0xPolygon/cdk/etherman" "github.com/0xPolygon/cdk/l1infotreesync" "github.com/0xPolygon/cdk/log" @@ -48,7 +47,7 @@ type AggSender struct { storage db.AggSenderStorage aggLayerClient agglayer.AgglayerClientInterface - sendInterval types.Duration + cfg Config sequencerKey *ecdsa.PrivateKey } @@ -73,6 +72,7 @@ func New( } return &AggSender{ + cfg: cfg, log: logger, storage: storage, l2Syncer: l2Syncer, @@ -80,18 +80,18 @@ func New( aggLayerClient: aggLayerClient, l1infoTreeSyncer: l1InfoTreeSyncer, sequencerKey: sequencerPrivateKey, - sendInterval: cfg.CertificateSendInterval, }, nil } // Start starts the AggSender func (a *AggSender) Start(ctx context.Context) { - a.sendCertificates(ctx) + go a.sendCertificates(ctx) + go a.checkIfCertificatesAreSettled(ctx) } // sendCertificates sends certificates to the aggLayer func (a *AggSender) sendCertificates(ctx context.Context) { - ticker := time.NewTicker(a.sendInterval.Duration) + ticker := time.NewTicker(a.cfg.CertificateSendInterval.Duration) for { select { @@ -408,3 +408,38 @@ func (a *AggSender) signCertificate(certificate *agglayer.Certificate) (*agglaye Signature: sig, }, nil } + +// checkIfCertificatesAreSettled checks if certificates are settled +func (a *AggSender) checkIfCertificatesAreSettled(ctx context.Context) { + ticker := time.NewTicker(a.cfg.CertificateSendInterval.Duration) + for { + select { + case <-ticker.C: + pendingCertificates, err := a.storage.GetCertificatesByStatus(ctx, []agglayer.CertificateStatus{agglayer.Pending}) + if err != nil { + a.log.Error("error getting pending certificates: %w", err) + continue + } + + for _, certificate := range pendingCertificates { + certificateHeader, err := a.aggLayerClient.GetCertificateHeader(certificate.CertificateID) + if err != nil { + a.log.Error("error getting header of certificate %s with height: %d from agglayer: %w", + certificate.CertificateID, certificate.Height, err) + continue + } + + if certificateHeader.Status == agglayer.Settled || certificateHeader.Status == agglayer.InError { + certificate.Status = certificateHeader.Status + + if err := a.storage.UpdateCertificateStatus(ctx, *certificate); err != nil { + a.log.Error("error updating certificate status in storage: %w", err) + continue + } + } + } + case <-ctx.Done(): + return + } + } +} diff --git a/aggsender/config.go b/aggsender/config.go index 3f335975..ebdfec1e 100644 --- a/aggsender/config.go +++ b/aggsender/config.go @@ -9,6 +9,7 @@ type Config struct { DBPath string `mapstructure:"DBPath"` AggLayerURL string `mapstructure:"AggLayerURL"` CertificateSendInterval types.Duration `mapstructure:"CertificateSendInterval"` + CheckSettledInterval types.Duration `mapstructure:"CheckSettledInterval"` SequencerPrivateKey types.KeystoreFileConfig `mapstructure:"SequencerPrivateKey"` URLRPCL2 string `mapstructure:"URLRPCL2"` } diff --git a/aggsender/db/aggsender_db_storage.go b/aggsender/db/aggsender_db_storage.go index c19d6429..6d6dec25 100644 --- a/aggsender/db/aggsender_db_storage.go +++ b/aggsender/db/aggsender_db_storage.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "math" + "strings" + "github.com/0xPolygon/cdk/agglayer" "github.com/0xPolygon/cdk/aggsender/db/migrations" "github.com/0xPolygon/cdk/aggsender/types" "github.com/0xPolygon/cdk/db" @@ -25,6 +27,10 @@ type AggSenderStorage interface { SaveLastSentCertificate(ctx context.Context, certificate types.CertificateInfo) error // DeleteCertificate deletes a certificate from the storage DeleteCertificate(ctx context.Context, certificateID common.Hash) error + // GetCertificatesByStatus returns a list of certificates by their status + GetCertificatesByStatus(ctx context.Context, status []agglayer.CertificateStatus) ([]*types.CertificateInfo, error) + // UpdateCertificateStatus updates the status of a certificate + UpdateCertificateStatus(ctx context.Context, certificate types.CertificateInfo) error } var _ AggSenderStorage = (*AggSenderSQLStorage)(nil) @@ -52,6 +58,45 @@ func NewAggSenderSQLStorage(logger *log.Logger, dbPath string) (*AggSenderSQLSto }, nil } +func (a *AggSenderSQLStorage) GetCertificatesByStatus(ctx context.Context, + statuses []agglayer.CertificateStatus) ([]*types.CertificateInfo, error) { + tx, err := a.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + if err != nil { + return nil, err + } + + defer func() { + if err := tx.Rollback(); err != nil { + a.logger.Warnf("error rolling back tx: %w", err) + } + }() + + query := "SELECT * FROM certificate_info" + args := make([]interface{}, len(statuses)) + + if len(statuses) > 0 { + placeholders := make([]string, len(statuses)) + // Build the WHERE clause for status filtering + for i := range statuses { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = statuses[i] + } + + // Build the WHERE clause with the joined placeholders + query += " WHERE status IN (" + strings.Join(placeholders, ", ") + ")" + } + + // Add ordering by creation date (oldest first) + query += " ORDER BY height ASC" + + var certificates []*types.CertificateInfo + if err = meddler.QueryAll(a.db, &certificates, query, args...); err != nil { + return nil, err + } + + return certificates, nil +} + // GetCertificateByHeight returns a certificate by its height func (a *AggSenderSQLStorage) GetCertificateByHeight(ctx context.Context, height uint64) (types.CertificateInfo, error) { @@ -79,7 +124,7 @@ func (a *AggSenderSQLStorage) GetCertificateByHeight(ctx context.Context, return certificateInfo, nil } -// GetLastSentCertificate returns the last certificate sent to the aggLayer +// GetLastSentCertificate returns the last certificate sent to the aggLayer that is still Pending func (a *AggSenderSQLStorage) GetLastSentCertificate(ctx context.Context) (types.CertificateInfo, error) { tx, err := a.db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { @@ -92,7 +137,8 @@ func (a *AggSenderSQLStorage) GetLastSentCertificate(ctx context.Context) (types } }() - rows, err := tx.Query(`SELECT * FROM certificate_info ORDER BY height DESC LIMIT 1;`) + rows, err := tx.Query(`SELECT * FROM certificate_info WHERE status = $1 ORDER BY height DESC LIMIT 1;`, + agglayer.Pending) if err != nil { return types.CertificateInfo{}, getSelectQueryError(math.MaxUint64, err) // force checking err not found } @@ -157,6 +203,33 @@ func (a *AggSenderSQLStorage) DeleteCertificate(ctx context.Context, certificate return nil } +// UpdateCertificateStatus updates the status of a certificate +func (a *AggSenderSQLStorage) UpdateCertificateStatus(ctx context.Context, certificate types.CertificateInfo) error { + tx, err := db.NewTx(ctx, a.db) + if err != nil { + return err + } + defer func() { + if err != nil { + if errRllbck := tx.Rollback(); errRllbck != nil { + a.logger.Errorf("error while rolling back tx %w", errRllbck) + } + } + }() + + if _, err := tx.Exec(`UPDATE certificate_info SET status = $1 WHERE certificate_id = $2;`, + certificate.Status, certificate.CertificateID); err != nil { + return fmt.Errorf("error updating certificate info: %w", err) + } + if err := tx.Commit(); err != nil { + return err + } + + a.logger.Debugf("updated certificate status - CertificateID: %s", certificate.CertificateID) + + return nil +} + // clean deletes all the data from the storage // NOTE: Used only in tests func (a *AggSenderSQLStorage) clean() error { diff --git a/aggsender/db/aggsender_db_storage_test.go b/aggsender/db/aggsender_db_storage_test.go index 7b646cb7..ee7ca1e2 100644 --- a/aggsender/db/aggsender_db_storage_test.go +++ b/aggsender/db/aggsender_db_storage_test.go @@ -74,7 +74,7 @@ func Test_Storage(t *testing.T) { NewLocalExitRoot: common.HexToHash("0x6"), FromBlock: 5, ToBlock: 6, - Status: agglayer.Settled, + Status: agglayer.Pending, } require.NoError(t, storage.SaveLastSentCertificate(ctx, certificate)) @@ -113,4 +113,68 @@ func Test_Storage(t *testing.T) { require.Equal(t, certificate, certificateFromDB) require.NoError(t, storage.clean()) }) + + t.Run("GetCertificatesByStatus", func(t *testing.T) { + // Insert some certificates with different statuses + certificates := []*types.CertificateInfo{ + { + Height: 7, + CertificateID: common.HexToHash("0x7"), + NewLocalExitRoot: common.HexToHash("0x8"), + FromBlock: 7, + ToBlock: 8, + Status: agglayer.Settled, + }, + { + Height: 9, + CertificateID: common.HexToHash("0x9"), + NewLocalExitRoot: common.HexToHash("0xA"), + FromBlock: 9, + ToBlock: 10, + Status: agglayer.Pending, + }, + { + Height: 11, + CertificateID: common.HexToHash("0xB"), + NewLocalExitRoot: common.HexToHash("0xC"), + FromBlock: 11, + ToBlock: 12, + Status: agglayer.InError, + }, + } + + for _, cert := range certificates { + require.NoError(t, storage.SaveLastSentCertificate(ctx, *cert)) + } + + // Test fetching certificates with status Settled + statuses := []agglayer.CertificateStatus{agglayer.Settled} + certificatesFromDB, err := storage.GetCertificatesByStatus(ctx, statuses) + require.NoError(t, err) + require.Len(t, certificatesFromDB, 1) + require.ElementsMatch(t, []*types.CertificateInfo{certificates[0]}, certificatesFromDB) + + // Test fetching certificates with status Pending + statuses = []agglayer.CertificateStatus{agglayer.Pending} + certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses) + require.NoError(t, err) + require.Len(t, certificatesFromDB, 1) + require.ElementsMatch(t, []*types.CertificateInfo{certificates[1]}, certificatesFromDB) + + // Test fetching certificates with status InError + statuses = []agglayer.CertificateStatus{agglayer.InError} + certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses) + require.NoError(t, err) + require.Len(t, certificatesFromDB, 1) + require.ElementsMatch(t, []*types.CertificateInfo{certificates[2]}, certificatesFromDB) + + // Test fetching certificates with status InError and Pending + statuses = []agglayer.CertificateStatus{agglayer.InError, agglayer.Pending} + certificatesFromDB, err = storage.GetCertificatesByStatus(ctx, statuses) + require.NoError(t, err) + require.Len(t, certificatesFromDB, 2) + require.ElementsMatch(t, []*types.CertificateInfo{certificates[1], certificates[2]}, certificatesFromDB) + + require.NoError(t, storage.clean()) + }) } diff --git a/config/default.go b/config/default.go index 98ea4e2a..599016f1 100644 --- a/config/default.go +++ b/config/default.go @@ -276,5 +276,5 @@ AggLayerURL = "http://zkevm-agglayer" SequencerPrivateKey = {Path = "/pk/sequencer.keystore", Password = "testonly"} CertificateSendInterval = "1m" URLRPCL2="http://test-aggoracle-l2:8545" - +CheckSettledInterval = "5s" `