diff --git a/go.mod b/go.mod index de2a306a..56d83a25 100644 --- a/go.mod +++ b/go.mod @@ -15,6 +15,7 @@ require ( github.com/google/uuid v1.6.0 github.com/hashicorp/go-plugin v1.6.3 github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 + github.com/mr-tron/base58 v1.2.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pelletier/go-toml/v2 v2.2.4 github.com/pkg/errors v0.9.1 @@ -100,7 +101,6 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/mr-tron/base58 v1.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/oklog/run v1.1.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect diff --git a/relayer/chainreader/indexer/events_indexer.go b/relayer/chainreader/indexer/events_indexer.go index 1328990c..4c9be199 100644 --- a/relayer/chainreader/indexer/events_indexer.go +++ b/relayer/chainreader/indexer/events_indexer.go @@ -5,20 +5,19 @@ import ( "encoding/hex" "errors" "fmt" - "reflect" "strconv" "strings" "sync" "time" "github.com/block-vision/sui-go-sdk/models" + "github.com/mr-tron/base58" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database" "github.com/smartcontractkit/chainlink-sui/relayer/client" - "github.com/smartcontractkit/chainlink-sui/relayer/codec" ) type EventsIndexer struct { @@ -204,30 +203,6 @@ func convertMapKeysToCamelCaseWithPath(input any, path string) any { return input } -func convertBytesToHex(input any) any { - kind := reflect.ValueOf(input).Kind() - - switch kind { - case reflect.Map: - result := make(map[string]any) - for k, v := range input.(map[string]any) { - result[k] = convertBytesToHex(v) - } - - return result - - case reflect.Slice: - bytes, err := codec.AnySliceToBytes(input.([]any)) - if err != nil { - return input - } - - return "0x" + hex.EncodeToString(bytes) - } - - return input -} - func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error { if selector == nil { return fmt.Errorf("unspecified selector for SyncEvent call") @@ -252,15 +227,31 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E cursor := eIndexer.lastProcessedCursors[eventHandle] eIndexer.cursorMutex.RUnlock() var totalCount uint64 - var err error + if cursor == nil { // attempt to get the latest event sync of the given type and use its data to construct a cursor - cursor, totalCount, err = eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle) - if err != nil { - return err + dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle) + if offsetErr != nil { + eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr) + return offsetErr } - eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle, "cursor", cursor) + if dbOffsetCursor != nil { + txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x")) + if err != nil { + eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err) + return err + } + // convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client) + cursor = &models.EventId{ + TxDigest: base58.Encode(txDigestBytes), + EventSeq: dbOffsetCursor.EventSeq, + } + + totalCount = dbTotalCount + } else { + eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle) + } } batchSize := uint(batchSizeRecords) @@ -331,15 +322,29 @@ eventLoop: // normalize the data, convert snake case to camel case normalizedData := convertMapKeysToCamelCase(event.ParsedJson) + // Convert the txDigest to hex + txDigestHex := event.Id.TxDigest + if base58Bytes, err := base58.Decode(txDigestHex); err == nil { + hexTxId := hex.EncodeToString(base58Bytes) + txDigestHex = "0x" + hexTxId + } + + blockHashBytes, err := base58.Decode(block.TxDigest) + if err != nil { + eIndexer.logger.Errorw("Failed to decode block hash", "error", err) + // fallback + blockHashBytes = []byte(block.TxDigest) + } + // Convert event to database record record := database.EventRecord{ EventAccountAddress: selector.Package, EventHandle: eventHandle, EventOffset: offset, - TxDigest: event.Id.TxDigest, + TxDigest: txDigestHex, BlockVersion: 0, BlockHeight: fmt.Sprintf("%d", block.Height), - BlockHash: []byte(block.TxDigest), + BlockHash: blockHashBytes, // Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers. BlockTimestamp: block.Timestamp / 1000, Data: normalizedData.(map[string]any), diff --git a/relayer/chainreader/indexer/events_indexer_test.go b/relayer/chainreader/indexer/events_indexer_test.go index 7282dba6..aae0bbed 100644 --- a/relayer/chainreader/indexer/events_indexer_test.go +++ b/relayer/chainreader/indexer/events_indexer_test.go @@ -520,6 +520,36 @@ func TestEventsIndexer(t *testing.T) { log.Infow("All concurrent access tests completed successfully") }) + t.Run("TestWithTimestamps", func(t *testing.T) { + log.Infow("Testing with timestamps") + + // Trigger some events + for i := 1; i <= 3; i++ { + createEvent(i) + } + + // Create a new event selector for timestamps + timestampEventSelector := &client.EventSelector{ + Package: packageId, + Module: "counter", + Event: "CounterIncremented", + } + + // Run sync to index events + err := indexer.SyncEvent(ctx, timestampEventSelector) + require.NoError(t, err) + + // Wait for events to be indexed + events := waitForEventCount(3, 60*time.Second) + require.GreaterOrEqual(t, len(events), 3) + + // Check that events are recorded with timestamps in seconds + for _, event := range events[:3] { + require.Greater(t, event.BlockTimestamp, uint64(0), "Event should have a timestamp") + require.Less(t, event.BlockTimestamp, uint64(time.Now().Unix()+1), "Event timestamp should be in the past") + } + }) + t.Run("TestRaceDetection", func(t *testing.T) { // Run with: go test -race -run TestEventsIndexer/TestRaceDetection log.Infow("Starting race detection test") diff --git a/relayer/chainreader/indexer/transactions_indexer.go b/relayer/chainreader/indexer/transactions_indexer.go index 82550acc..8411063a 100644 --- a/relayer/chainreader/indexer/transactions_indexer.go +++ b/relayer/chainreader/indexer/transactions_indexer.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "encoding/base64" "encoding/hex" "errors" "fmt" @@ -449,13 +450,27 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con continue } + // Convert the txDigest to hex + txDigestHex := transactionRecord.Digest + if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil { + hexTxId := hex.EncodeToString(base64Bytes) + txDigestHex = "0x" + hexTxId + } + + blockHashBytes, err := base64.StdEncoding.DecodeString(checkpointResponse.Digest) + if err != nil { + tIndexer.logger.Errorw("Failed to decode block hash", "error", err) + // fallback + blockHashBytes = []byte(checkpointResponse.Digest) + } + record := database.EventRecord{ EventAccountAddress: eventAccountAddress, EventHandle: eventHandle, EventOffset: 0, - TxDigest: transactionRecord.Digest, + TxDigest: txDigestHex, BlockHeight: checkpointResponse.SequenceNumber, - BlockHash: []byte(checkpointResponse.Digest), + BlockHash: blockHashBytes, BlockTimestamp: blockTimestamp, Data: executionStateChanged, }