Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-plugin v1.6.3
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4
github.com/mr-tron/base58 v1.2.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pelletier/go-toml/v2 v2.2.4
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -100,7 +101,6 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/oklog/run v1.1.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
Expand Down
71 changes: 38 additions & 33 deletions relayer/chainreader/indexer/events_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,19 @@ import (
"encoding/hex"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/block-vision/sui-go-sdk/models"
"github.com/mr-tron/base58"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"

"github.com/smartcontractkit/chainlink-sui/relayer/chainreader/database"
"github.com/smartcontractkit/chainlink-sui/relayer/client"
"github.com/smartcontractkit/chainlink-sui/relayer/codec"
)

type EventsIndexer struct {
Expand Down Expand Up @@ -204,30 +203,6 @@ func convertMapKeysToCamelCaseWithPath(input any, path string) any {
return input
}

func convertBytesToHex(input any) any {
kind := reflect.ValueOf(input).Kind()

switch kind {
case reflect.Map:
result := make(map[string]any)
for k, v := range input.(map[string]any) {
result[k] = convertBytesToHex(v)
}

return result

case reflect.Slice:
bytes, err := codec.AnySliceToBytes(input.([]any))
if err != nil {
return input
}

return "0x" + hex.EncodeToString(bytes)
}

return input
}

func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.EventSelector) error {
if selector == nil {
return fmt.Errorf("unspecified selector for SyncEvent call")
Expand All @@ -252,15 +227,31 @@ func (eIndexer *EventsIndexer) SyncEvent(ctx context.Context, selector *client.E
cursor := eIndexer.lastProcessedCursors[eventHandle]
eIndexer.cursorMutex.RUnlock()
var totalCount uint64
var err error

if cursor == nil {
// attempt to get the latest event sync of the given type and use its data to construct a cursor
cursor, totalCount, err = eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
if err != nil {
return err
dbOffsetCursor, dbTotalCount, offsetErr := eIndexer.db.GetLatestOffset(ctx, selector.Package, eventHandle)
if offsetErr != nil {
eIndexer.logger.Errorw("syncEvent: failed to get latest offset", "error", offsetErr)
return offsetErr
}

eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle, "cursor", cursor)
if dbOffsetCursor != nil {
txDigestBytes, err := hex.DecodeString(strings.TrimPrefix(dbOffsetCursor.TxDigest, "0x"))
if err != nil {
eIndexer.logger.Errorw("syncEvent: failed to decode tx digest", "error", err)
return err
}
// convert the db offset cursor digest from hex (the format stored in the DB) to base58 (the format expected by the client)
cursor = &models.EventId{
TxDigest: base58.Encode(txDigestBytes),
EventSeq: dbOffsetCursor.EventSeq,
}

totalCount = dbTotalCount
} else {
eIndexer.logger.Debugw("syncEvent: starting fresh sync", "handle", eventHandle)
}
}

batchSize := uint(batchSizeRecords)
Expand Down Expand Up @@ -331,15 +322,29 @@ eventLoop:
// normalize the data, convert snake case to camel case
normalizedData := convertMapKeysToCamelCase(event.ParsedJson)

// Convert the txDigest to hex
txDigestHex := event.Id.TxDigest
if base58Bytes, err := base58.Decode(txDigestHex); err == nil {
hexTxId := hex.EncodeToString(base58Bytes)
txDigestHex = "0x" + hexTxId
}

blockHashBytes, err := base58.Decode(block.TxDigest)
if err != nil {
eIndexer.logger.Errorw("Failed to decode block hash", "error", err)
// fallback
blockHashBytes = []byte(block.TxDigest)
}

// Convert event to database record
record := database.EventRecord{
EventAccountAddress: selector.Package,
EventHandle: eventHandle,
EventOffset: offset,
TxDigest: event.Id.TxDigest,
TxDigest: txDigestHex,
BlockVersion: 0,
BlockHeight: fmt.Sprintf("%d", block.Height),
BlockHash: []byte(block.TxDigest),
BlockHash: blockHashBytes,
// Sui returns block.Timestamp in ms; convert to seconds for consistency with CCIP readers.
BlockTimestamp: block.Timestamp / 1000,
Data: normalizedData.(map[string]any),
Expand Down
30 changes: 30 additions & 0 deletions relayer/chainreader/indexer/events_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,36 @@ func TestEventsIndexer(t *testing.T) {
log.Infow("All concurrent access tests completed successfully")
})

t.Run("TestWithTimestamps", func(t *testing.T) {
log.Infow("Testing with timestamps")

// Trigger some events
for i := 1; i <= 3; i++ {
createEvent(i)
}

// Create a new event selector for timestamps
timestampEventSelector := &client.EventSelector{
Package: packageId,
Module: "counter",
Event: "CounterIncremented",
}

// Run sync to index events
err := indexer.SyncEvent(ctx, timestampEventSelector)
require.NoError(t, err)

// Wait for events to be indexed
events := waitForEventCount(3, 60*time.Second)
require.GreaterOrEqual(t, len(events), 3)

// Check that events are recorded with timestamps in seconds
for _, event := range events[:3] {
require.Greater(t, event.BlockTimestamp, uint64(0), "Event should have a timestamp")
require.Less(t, event.BlockTimestamp, uint64(time.Now().Unix()+1), "Event timestamp should be in the past")
}
})

t.Run("TestRaceDetection", func(t *testing.T) {
// Run with: go test -race -run TestEventsIndexer/TestRaceDetection
log.Infow("Starting race detection test")
Expand Down
19 changes: 17 additions & 2 deletions relayer/chainreader/indexer/transactions_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
Expand Down Expand Up @@ -449,13 +450,27 @@ func (tIndexer *TransactionsIndexer) syncTransmitterTransactions(ctx context.Con
continue
}

// Convert the txDigest to hex
txDigestHex := transactionRecord.Digest
if base64Bytes, err := base64.StdEncoding.DecodeString(txDigestHex); err == nil {
hexTxId := hex.EncodeToString(base64Bytes)
txDigestHex = "0x" + hexTxId
}

blockHashBytes, err := base64.StdEncoding.DecodeString(checkpointResponse.Digest)
if err != nil {
tIndexer.logger.Errorw("Failed to decode block hash", "error", err)
// fallback
blockHashBytes = []byte(checkpointResponse.Digest)
}

record := database.EventRecord{
EventAccountAddress: eventAccountAddress,
EventHandle: eventHandle,
EventOffset: 0,
TxDigest: transactionRecord.Digest,
TxDigest: txDigestHex,
BlockHeight: checkpointResponse.SequenceNumber,
BlockHash: []byte(checkpointResponse.Digest),
BlockHash: blockHashBytes,
BlockTimestamp: blockTimestamp,
Data: executionStateChanged,
}
Expand Down
Loading