From bd4e1548d8ba2dc8a0c4379a3ffc7467107c6a13 Mon Sep 17 00:00:00 2001 From: weiihann Date: Thu, 10 Oct 2024 15:46:07 +0800 Subject: [PATCH] starknet_subscribeTransactionStatus websocket method --- ...3300000000444400000000555500000000fff.json | 13 + docs/docs/faq.md | 2 +- docs/docs/websocket.md | 64 +++- jsonrpc/server.go | 8 +- rpc/events.go | 116 ++++++- rpc/events_test.go | 309 ++++++++++++++++++ rpc/handlers.go | 17 +- rpc/transaction.go | 5 +- sync/sync_test.go | 100 ------ utils/nil.go | 7 + 10 files changed, 519 insertions(+), 122 deletions(-) create mode 100644 clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json create mode 100644 utils/nil.go diff --git a/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json b/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json new file mode 100644 index 0000000000..5bd7c674c9 --- /dev/null +++ b/clients/feeder/testdata/mainnet/transaction/0x111100000000222200000000333300000000444400000000555500000000fff.json @@ -0,0 +1,13 @@ +{ + "revert_error": "This is hand-made transaction used for txStatus endpoint test", + "execution_status": "REJECTED", + "finality_status": "ACCEPTED_ON_L1", + "status": "REVERTED", + "block_hash": "0x111100000000111100000000333300000000444400000000111100000000111", + "block_number": 304740, + "transaction_index": 1, + "transaction_hash": "0x111100000000222200000000333300000000444400000000555500000000fff", + "l2_to_l1_messages": [], + "events": [], + "actual_fee": "0x247aff6e224" +} diff --git a/docs/docs/faq.md b/docs/docs/faq.md index 78828677ce..1ae6c6a56f 100644 --- a/docs/docs/faq.md +++ b/docs/docs/faq.md @@ -74,7 +74,7 @@ docker logs -f juno
How can I get real-time updates of new blocks? -The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain. +The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
diff --git a/docs/docs/websocket.md b/docs/docs/websocket.md index ba55e24db8..8c8282cfbc 100644 --- a/docs/docs/websocket.md +++ b/docs/docs/websocket.md @@ -96,7 +96,7 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA ## Subscribe to newly created blocks -The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain: +The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain: @@ -104,8 +104,7 @@ The WebSocket server provides a `juno_subscribeNewHeads` method that emits an ev ```json { "jsonrpc": "2.0", - "method": "juno_subscribeNewHeads", - "params": [], + "method": "starknet_subscribeNewHeads", "id": 1 } ``` @@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this: ```json { "jsonrpc": "2.0", - "method": "juno_subscribeNewHeads", + "method": "starknet_subscriptionNewHeads", "params": { "result": { "block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c", @@ -149,12 +148,65 @@ When a new block is added, you will receive a message like this: "l1_da_mode": "BLOB", "starknet_version": "0.13.1.1" }, - "subscription": 16570962336122680234 + "subscription_id": 16570962336122680234 + } +} +``` + +## Subscribe to transaction status changes + +The WebSocket server provides a `starknet_subscribeTransactionStatus` method that emits an event when a transaction status changes: + + + + +```json +{ + "jsonrpc": "2.0", + "method": "starknet_subscribeTransactionStatus", + "params": [ + { + "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df" + } + ], + "id": 1 +} +``` + + + + +```json +{ + "jsonrpc": "2.0", + "result": 16570962336122680234, + "id": 1 +} +``` + + + + +When a transaction get a new status, you will receive a message like this: + +```json +{ + "jsonrpc": "2.0", + "method": "starknet_subscriptionTransactionsStatus", + "params": { + "result": { + "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df", + "status": { + "finality_status": "ACCEPTED_ON_L2", + "execution_status": "SUCCEEDED" + } + }, + "subscription_id": 16570962336122680234 } } ``` -## Unsubscribe from newly created blocks +## Unsubscribe from previous subscription Use the `juno_unsubscribe` method with the `result` value from the subscription response or the `subscription` field from any new block event to stop receiving updates for new blocks: diff --git a/jsonrpc/server.go b/jsonrpc/server.go index c63f15c849..16f42f6027 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -422,10 +422,6 @@ func isBatch(reader *bufio.Reader) bool { return false } -func isNil(i any) bool { - return i == nil || reflect.ValueOf(i).IsNil() -} - func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) { s.log.Tracew("Received request", "req", req) @@ -471,7 +467,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht header = (tuple[1].Interface()).(http.Header) } - if errAny := tuple[errorIndex].Interface(); !isNil(errAny) { + if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) { res.Error = errAny.(*Error) if res.Error.Code == InternalError { s.listener.OnRequestFailed(req.Method, res.Error) @@ -498,7 +494,7 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method) addContext = 1 } - if isNil(params) { + if utils.IsNil(params) { allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool { return p.Optional }) diff --git a/rpc/events.go b/rpc/events.go index a7298486f8..a8e073b55c 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -14,6 +14,10 @@ type EventsArg struct { ResultPageRequest } +type SubscriptionID struct { + ID uint64 `json:"subscription_id"` +} + type EventFilter struct { FromBlock *BlockID `json:"from_block"` ToBlock *BlockID `json:"to_block"` @@ -44,10 +48,6 @@ type EventsChunk struct { ContinuationToken string `json:"continuation_token,omitempty"` } -type SubscriptionID struct { - ID uint64 `json:"subscription_id"` -} - /**************************************************** Events Handlers *****************************************************/ @@ -100,6 +100,87 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error return id, nil } +// SubscribeTxnStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added. +// Subsequent updates are sent only when the transaction status changes. +// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent. +func (h *Handler) SubscribeTxnStatus(ctx context.Context, txHash felt.Felt, _ *BlockID) (*SubscriptionID, *jsonrpc.Error) { + var ( + lastKnownStatus, lastSendStatus *TransactionStatus + wrapResult = func(s *TransactionStatus) *NewTransactionStatus { + return &NewTransactionStatus{ + TransactionHash: &txHash, + Status: s, + } + } + ) + + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + + lastKnownStatus, rpcErr := h.TransactionStatus(subscriptionCtx, txHash) + if rpcErr != nil { + h.log.Errorw("Failed to get Tx status", "txHash", &txHash, "rpcErr", rpcErr) + return nil, rpcErr + } + + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + statusSub := h.txnStatus.Subscribe() + headerSub := h.newHeads.Subscribe() + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + statusSub.Unsubscribe() + headerSub.Unsubscribe() + }() + + if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil { + h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err) + return + } + lastSendStatus = lastKnownStatus + + for { + select { + case <-subscriptionCtx.Done(): + return + case <-headerSub.Recv(): + lastKnownStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash) + if rpcErr != nil { + h.log.Errorw("Failed to get Tx status", "txHash", txHash, "rpcErr", rpcErr) + return + } + + if *lastKnownStatus != *lastSendStatus { + if err := h.sendTxnStatus(sub.conn, wrapResult(lastKnownStatus), id); err != nil { + h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err) + return + } + lastSendStatus = lastKnownStatus + } + + // Stop when final status reached and notified + if isFinal(lastSendStatus) { + return + } + } + } + }) + + return &SubscriptionID{ID: id}, nil +} + func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) { w, ok := jsonrpc.ConnFromContext(ctx) if !ok { @@ -217,3 +298,30 @@ func setEventFilterRange(filter blockchain.EventFilterer, fromID, toID *BlockID, } return set(blockchain.EventFilterTo, toID) } + +type NewTransactionStatus struct { + TransactionHash *felt.Felt `json:"transaction_hash"` + Status *TransactionStatus `json:"status"` +} + +// sendHeader creates a request and sends it to the client +func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status *NewTransactionStatus, id uint64) error { + resp, err := json.Marshal(jsonrpc.Request{ + Version: "2.0", + Method: "starknet_subscriptionTransactionsStatus", + Params: map[string]any{ + "subscription_id": id, + "result": status, + }, + }) + if err != nil { + return err + } + h.log.Debugw("Sending Txn status", "status", string(resp)) + _, err = w.Write(resp) + return err +} + +func isFinal(status *TransactionStatus) bool { + return status.Finality == TxnStatusRejected || status.Finality == TxnStatusAcceptedOnL1 +} diff --git a/rpc/events_test.go b/rpc/events_test.go index c2f1417791..f0958aac41 100644 --- a/rpc/events_test.go +++ b/rpc/events_test.go @@ -13,8 +13,11 @@ import ( "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/db/pebble" + "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" + "github.com/NethermindEth/juno/mocks" "github.com/NethermindEth/juno/rpc" adaptfeeder "github.com/NethermindEth/juno/starknetdata/feeder" "github.com/NethermindEth/juno/sync" @@ -22,6 +25,18 @@ import ( "github.com/coder/websocket" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +const ( + unsubscribeMsg = `{"jsonrpc":"2.0","id":1,"method":"juno_unsubscribe","params":[%d]}` + unsubscribeNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":100,"message":"Subscription not found"},"id":1}` + subscribeResponse = `{"jsonrpc":"2.0","result":{"subscription_id":%d},"id":1}` + subscribeTxStatus = `{"jsonrpc":"2.0","id":1,"method":"starknet_subscribeTransactionStatus","params":{"transaction_hash":"%s"}}` + txStatusNotFoundResponse = `{"jsonrpc":"2.0","error":{"code":29,"message":"Transaction hash not found"},"id":1}` + txStatusResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%s","status":{%s}},"subscription_id":%d}}` + txStatusStatusBothStatuses = `"finality_status":"%s","execution_status":"%s"` + txStatusStatusRejected = `"finality_status":"%s","failure_reason":"%s"` ) func TestEvents(t *testing.T) { @@ -231,6 +246,42 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool { return fc.w == fc2.w } +type fakeSyncer struct { + newHeads *feed.Feed[*core.Header] + pendingTxs *feed.Feed[[]core.Transaction] +} + +func newFakeSyncer() *fakeSyncer { + return &fakeSyncer{ + newHeads: feed.New[*core.Header](), + pendingTxs: feed.New[[]core.Transaction](), + } +} + +func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription { + return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()} +} + +func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) { + return 0, nil +} + +func (fs *fakeSyncer) HighestBlockHeader() *core.Header { + return nil +} + +func (fs *fakeSyncer) Pending() (*sync.Pending, error) { + return nil, fmt.Errorf("not implemented") +} + +func (fs *fakeSyncer) PendingBlock() *core.Block { + return nil +} + +func (fs *fakeSyncer) PendingState() (core.StateReader, func() error, error) { + return nil, nil, fmt.Errorf("not implemented") +} + func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) { t.Parallel() log := utils.NewNopZapLogger() @@ -399,3 +450,261 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { require.NoError(t, conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, firstID)))) require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubMsg, secondID)))) } + +func TestSubscribeTxStatusAndUnsubscribe(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockReader := mocks.NewMockReader(mockCtrl) + handler, syncer, server := setupSubscriptionTest(t, ctx, mockReader) + + require.NoError(t, server.RegisterMethods(jsonrpc.Method{ + Name: "starknet_subscribeTransactionStatus", + Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}}, + Handler: handler.SubscribeTxnStatus, + }, jsonrpc.Method{ + Name: "juno_unsubscribe", + Params: []jsonrpc.Parameter{{Name: "id"}}, + Handler: handler.Unsubscribe, + })) + + ws := jsonrpc.NewWebsocket(server, utils.NewNopZapLogger()) + httpSrv := httptest.NewServer(ws) + + // default returns from mocks + txnHash := utils.HexToFelt(t, "0x111100000000111100000000111100000000111100000000111100000000111") + txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} + receipt := &core.TransactionReceipt{ + TransactionHash: txnHash, + Reverted: false, + } + mockReader.EXPECT().TransactionByHash(txnHash).Return(txn, nil).AnyTimes() + mockReader.EXPECT().Receipt(txnHash).Return(receipt, nil, uint64(1), nil).AnyTimes() + mockReader.EXPECT().TransactionByHash(&felt.Zero).Return(nil, db.ErrKeyNotFound).AnyTimes() + + firstID := uint64(1) + secondID := uint64(2) + + t.Run("simple subscribe and unsubscribe", func(t *testing.T) { + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + conn2, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + handler.WithIDGen(func() uint64 { return firstID }) + firstWant := txStatusNotFoundResponse + // Notice we subscribe for non-existing tx, we expect automatic unsubscribe + firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, felt.Zero.String())) + require.NoError(t, err) + require.Equal(t, firstWant, firstGot) + + handler.WithIDGen(func() uint64 { return secondID }) + secondWant := fmt.Sprintf(subscribeResponse, secondID) + secondGot := sendAndReceiveMessage(t, ctx, conn2, fmt.Sprintf(subscribeTxStatus, txnHash)) + require.NoError(t, err) + require.Equal(t, secondWant, secondGot) + + // as expected the subscription is gone + firstUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID)) + require.Equal(t, unsubscribeNotFoundResponse, firstUnsubGot) + + // Receive a block header. + secondWant = formatTxStatusResponse(t, txnHash, rpc.TxnStatusAcceptedOnL2, rpc.TxnSuccess, secondID) + _, secondHeaderGot, err := conn2.Read(ctx) + secondGot = string(secondHeaderGot) + require.NoError(t, err) + require.Equal(t, secondWant, secondGot) + + // Unsubscribe + require.NoError(t, conn2.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubscribeMsg, secondID)))) + }) + + t.Run("no update is sent when status has not changed", func(t *testing.T) { + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + handler.WithIDGen(func() uint64 { return firstID }) + firstWant := fmt.Sprintf(subscribeResponse, firstID) + firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, txnHash)) + require.NoError(t, err) + require.Equal(t, firstWant, firstGot) + + firstStatusWant := formatTxStatusResponse(t, txnHash, rpc.TxnStatusAcceptedOnL2, rpc.TxnSuccess, firstID) + _, firstStatusGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, firstStatusWant, string(firstStatusGot)) + + // Simulate a new block + syncer.newHeads.Send(testHeader(t)) + + // expected no status is send + timeoutCtx, toCancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer toCancel() + _, _, err = conn1.Read(timeoutCtx) + require.Regexp(t, "failed to get reader: ", err.Error()) + + // at this time connection is closed + require.EqualError(t, + conn1.Write(ctx, websocket.MessageBinary, []byte(fmt.Sprintf(unsubscribeMsg, firstID))), + "failed to write msg: use of closed network connection") + }) + + t.Run("update is only sent when new status is different", func(t *testing.T) { + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + otherTxn := utils.HexToFelt(t, "0x222200000000111100000000222200000000111100000000111100000000222") + someBlkHash := utils.HexToFelt(t, "0x333300000000111100000000222200000000333300000000111100000000fff") + txn := &core.DeployTransaction{TransactionHash: txnHash, Version: (*core.TransactionVersion)(&felt.Zero)} + receipt := &core.TransactionReceipt{ + TransactionHash: otherTxn, + Reverted: false, + } + mockReader.EXPECT().TransactionByHash(otherTxn).Return(txn, nil).Times(2) + mockReader.EXPECT().Receipt(otherTxn).Return(receipt, someBlkHash, uint64(1), nil).Times(2) + mockReader.EXPECT().L1Head().Return(&core.L1Head{BlockNumber: 0}, nil) + + handler.WithIDGen(func() uint64 { return firstID }) + firstWant := fmt.Sprintf(subscribeResponse, firstID) + firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, otherTxn)) + require.NoError(t, err) + require.Equal(t, firstWant, firstGot) + + firstStatusWant := formatTxStatusResponse(t, otherTxn, rpc.TxnStatusAcceptedOnL2, rpc.TxnSuccess, firstID) + _, firstStatusGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, firstStatusWant, string(firstStatusGot)) + + mockReader.EXPECT().L1Head().Return(&core.L1Head{BlockNumber: 5}, nil).Times(1) + syncer.newHeads.Send(testHeader(t)) + + secondStatusWant := formatTxStatusResponse(t, otherTxn, rpc.TxnStatusAcceptedOnL1, rpc.TxnSuccess, firstID) + _, secondStatusGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, secondStatusWant, string(secondStatusGot)) + + // second status is final - subcription should be automatically removed + thirdUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID)) + require.Equal(t, unsubscribeNotFoundResponse, thirdUnsubGot) + }) + + t.Run("subscription ends when tx reaches final status", func(t *testing.T) { + conn1, _, err := websocket.Dial(ctx, httpSrv.URL, nil) + require.NoError(t, err) + + revertedTxn := utils.HexToFelt(t, "0x111100000000222200000000333300000000444400000000555500000000fff") + mockReader.EXPECT().TransactionByHash(revertedTxn).Return(nil, db.ErrKeyNotFound).Times(2) + + handler.WithIDGen(func() uint64 { return firstID }) + handler.WithFeeder(feeder.NewTestClient(t, &utils.Mainnet)) + defer handler.WithFeeder(nil) + + firstWant := fmt.Sprintf(subscribeResponse, firstID) + firstGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(subscribeTxStatus, revertedTxn)) + require.NoError(t, err) + require.Equal(t, firstWant, firstGot) + + firstStatusWant := formatTxStatusResponse(t, revertedTxn, rpc.TxnStatusRejected, rpc.TxnFailure, firstID, "This is hand-made transaction used for txStatus endpoint test") + _, firstStatusGot, err := conn1.Read(ctx) + require.NoError(t, err) + require.Equal(t, firstStatusWant, string(firstStatusGot)) + + // final status will be discovered after a new head is received + syncer.newHeads.Send(testHeader(t)) + // and wait a bit for the subscription to process the event + time.Sleep(50 * time.Millisecond) + + // second status is final - subcription should be automatically removed + thirdUnsubGot := sendAndReceiveMessage(t, ctx, conn1, fmt.Sprintf(unsubscribeMsg, firstID)) + require.Equal(t, unsubscribeNotFoundResponse, thirdUnsubGot) + }) +} + +func testHeader(t *testing.T) *core.Header { + t.Helper() + + header := &core.Header{ + Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), + Number: 2, + GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), + Timestamp: 1637084470, + SequencerAddress: utils.HexToFelt(t, "0x0"), + L1DataGasPrice: &core.GasPrice{ + PriceInFri: utils.HexToFelt(t, "0x0"), + PriceInWei: utils.HexToFelt(t, "0x0"), + }, + GasPrice: utils.HexToFelt(t, "0x0"), + GasPriceSTRK: utils.HexToFelt(t, "0x0"), + L1DAMode: core.Calldata, + ProtocolVersion: "", + } + return header +} + +func setupSubscriptionTest(t *testing.T, ctx context.Context, srvs ...any) (*rpc.Handler, *fakeSyncer, *jsonrpc.Server) { + t.Helper() + + var ( + log utils.Logger + chain blockchain.Reader + ) + + for _, srv := range srvs { + switch srv := srv.(type) { + case utils.Logger: + log = srv + case blockchain.Reader: + chain = srv + default: + t.Fatalf("unexpected option type: %T", srv) + } + } + + // provide good defaults + if log == nil { + log = utils.NewNopZapLogger() + } + if chain == nil { + chain = blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) + } + syncer := newFakeSyncer() + + handler := rpc.New(chain, syncer, nil, "", log) + go func() { + require.NoError(t, handler.Run(ctx)) + }() + time.Sleep(50 * time.Millisecond) + + server := jsonrpc.NewServer(1, log) + + return handler, syncer, server +} + +func sendAndReceiveMessage(t *testing.T, ctx context.Context, conn *websocket.Conn, message string) string { + t.Helper() + + require.NoError(t, conn.Write(ctx, websocket.MessageText, []byte(message))) + + _, response, err := conn.Read(ctx) + require.NoError(t, err) + return string(response) +} + +func formatTxStatusResponse(t *testing.T, txnHash *felt.Felt, finality rpc.TxnStatus, execution rpc.TxnExecutionStatus, id uint64, reason ...string) string { + t.Helper() + + finStatusB, err := finality.MarshalText() + require.NoError(t, err) + exeStatusB, err := execution.MarshalText() + require.NoError(t, err) + + statusBody := fmt.Sprintf(txStatusStatusBothStatuses, string(finStatusB), string(exeStatusB)) + if finality == rpc.TxnStatusRejected { + statusBody = fmt.Sprintf(txStatusStatusRejected, string(finStatusB), reason[0]) + } + return fmt.Sprintf(txStatusResponse, txnHash, statusBody, id) +} diff --git a/rpc/handlers.go b/rpc/handlers.go index 1cf96b0c21..d7a45faba4 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -66,6 +66,7 @@ var ( ErrUnsupportedTxVersion = &jsonrpc.Error{Code: 61, Message: "the transaction version is not supported"} ErrUnsupportedContractClassVersion = &jsonrpc.Error{Code: 62, Message: "the contract class version is not supported"} ErrUnexpectedError = &jsonrpc.Error{Code: 63, Message: "An unexpected error occurred"} + ErrTooManyAddressesInFilter = &jsonrpc.Error{Code: 67, Message: "Too many addresses in filter sender_address filter"} ErrTooManyBlocksBack = &jsonrpc.Error{Code: 68, Message: fmt.Sprintf("Cannot go back more than %v blocks", maxBlocksBack)} ErrCallOnPending = &jsonrpc.Error{Code: 69, Message: "This method does not support being called on the pending block"} @@ -93,8 +94,10 @@ type Handler struct { vm vm.VM log utils.Logger - version string - newHeads *feed.Feed[*core.Header] + version string + newHeads *feed.Feed[*core.Header] + pendingTxs *feed.Feed[[]core.Transaction] + txnStatus *feed.Feed[*NewTransactionStatus] idgen func() uint64 mu stdsync.Mutex // protects subscriptions. @@ -135,6 +138,8 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V }, version: version, newHeads: feed.New[*core.Header](), + pendingTxs: feed.New[[]core.Transaction](), + txnStatus: feed.New[*NewTransactionStatus](), subscriptions: make(map[uint64]*subscription), blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize), @@ -177,7 +182,8 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler { func (h *Handler) Run(ctx context.Context) error { newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription defer newHeadsSub.Unsubscribe() - feed.Tee[*core.Header](newHeadsSub, h.newHeads) + feed.Tee(newHeadsSub, h.newHeads) + <-ctx.Done() for _, sub := range h.subscriptions { sub.wg.Wait() @@ -347,6 +353,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen Name: "juno_subscribeNewHeads", Handler: h.SubscribeNewHeads, }, + { + Name: "starknet_subscribeTransactionStatus", + Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}}, + Handler: h.SubscribeTxnStatus, + }, { Name: "juno_unsubscribe", Params: []jsonrpc.Parameter{{Name: "id"}}, diff --git a/rpc/transaction.go b/rpc/transaction.go index 311955b890..e464a98aca 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -616,8 +616,9 @@ func (h *Handler) TransactionStatus(ctx context.Context, hash felt.Felt) (*Trans switch txErr { case nil: return &TransactionStatus{ - Finality: TxnStatus(receipt.FinalityStatus), - Execution: receipt.ExecutionStatus, + Finality: TxnStatus(receipt.FinalityStatus), + Execution: receipt.ExecutionStatus, + FailureReason: receipt.RevertReason, }, nil case ErrTxnHashNotFound: if h.feederClient == nil { diff --git a/sync/sync_test.go b/sync/sync_test.go index ab97fc322b..6b0498af29 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -196,103 +196,3 @@ func TestSubscribeNewHeads(t *testing.T) { require.Equal(t, want.Header, got) sub.Unsubscribe() } - -func TestPendingSync(t *testing.T) { - t.Parallel() - - client := feeder.NewTestClient(t, &utils.Mainnet) - gw := adaptfeeder.New(client) - - var synchronizer *sync.Synchronizer - testDB := pebble.NewMemTest(t) - log := utils.NewNopZapLogger() - bc := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock) - synchronizer = sync.New(bc, gw, log, time.Millisecond*100, false, testDB) - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - - require.NoError(t, synchronizer.Run(ctx)) - cancel() - - head, err := bc.HeadsHeader() - require.NoError(t, err) - pending, err := synchronizer.Pending() - require.NoError(t, err) - assert.Equal(t, head.Hash, pending.Block.ParentHash) -} - -func TestPending(t *testing.T) { - client := feeder.NewTestClient(t, &utils.Mainnet) - gw := adaptfeeder.New(client) - - var synchronizer *sync.Synchronizer - testDB := pebble.NewMemTest(t) - chain := blockchain.New(testDB, &utils.Mainnet, synchronizer.PendingBlock) - synchronizer = sync.New(chain, gw, utils.NewNopZapLogger(), 0, false, testDB) - - b, err := gw.BlockByNumber(context.Background(), 0) - require.NoError(t, err) - su, err := gw.StateUpdate(context.Background(), 0) - require.NoError(t, err) - - t.Run("pending state shouldnt exist if no pending block", func(t *testing.T) { - _, _, err = synchronizer.PendingState() - require.Error(t, err) - }) - - t.Run("cannot store unsupported pending block version", func(t *testing.T) { - pending := &sync.Pending{Block: &core.Block{Header: &core.Header{ProtocolVersion: "1.9.0"}}} - require.Error(t, synchronizer.StorePending(pending)) - }) - - t.Run("store genesis as pending", func(t *testing.T) { - pendingGenesis := &sync.Pending{ - Block: b, - StateUpdate: su, - } - require.NoError(t, synchronizer.StorePending(pendingGenesis)) - - gotPending, pErr := synchronizer.Pending() - require.NoError(t, pErr) - assert.Equal(t, pendingGenesis, gotPending) - }) - - require.NoError(t, chain.Store(b, &core.BlockCommitments{}, su, nil)) - - t.Run("storing a pending too far into the future should fail", func(t *testing.T) { - b, err = gw.BlockByNumber(context.Background(), 2) - require.NoError(t, err) - su, err = gw.StateUpdate(context.Background(), 2) - require.NoError(t, err) - - notExpectedPending := sync.Pending{ - Block: b, - StateUpdate: su, - } - require.ErrorIs(t, synchronizer.StorePending(¬ExpectedPending), blockchain.ErrParentDoesNotMatchHead) - }) - - t.Run("store expected pending block", func(t *testing.T) { - b, err = gw.BlockByNumber(context.Background(), 1) - require.NoError(t, err) - su, err = gw.StateUpdate(context.Background(), 1) - require.NoError(t, err) - - expectedPending := &sync.Pending{ - Block: b, - StateUpdate: su, - } - require.NoError(t, synchronizer.StorePending(expectedPending)) - - gotPending, pErr := synchronizer.Pending() - require.NoError(t, pErr) - assert.Equal(t, expectedPending, gotPending) - }) - - t.Run("get pending state", func(t *testing.T) { - _, pendingStateCloser, pErr := synchronizer.PendingState() - t.Cleanup(func() { - require.NoError(t, pendingStateCloser()) - }) - require.NoError(t, pErr) - }) -} diff --git a/utils/nil.go b/utils/nil.go new file mode 100644 index 0000000000..56e5c89185 --- /dev/null +++ b/utils/nil.go @@ -0,0 +1,7 @@ +package utils + +import "reflect" + +func IsNil(i any) bool { + return i == nil || reflect.ValueOf(i).IsNil() +}