Skip to content

Commit

Permalink
Implement subscribeNewHeads,subscriptionReorg,`subscribePendingTr…
Browse files Browse the repository at this point in the history
…ansactions` (#2211)
  • Loading branch information
weiihann authored Dec 23, 2024
1 parent 81262fd commit ea728d9
Show file tree
Hide file tree
Showing 12 changed files with 1,114 additions and 366 deletions.
2 changes: 1 addition & 1 deletion docs/docs/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ docker logs -f juno
<details>
<summary>How can I get real-time updates of new blocks?</summary>

The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.
The [WebSocket](websocket#subscribe-to-newly-created-blocks) interface provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain.

</details>

Expand Down
9 changes: 4 additions & 5 deletions docs/docs/websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,15 @@ Get the most recent accepted block hash and number with the `starknet_blockHashA

## Subscribe to newly created blocks

The WebSocket server provides a `juno_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:
The WebSocket server provides a `starknet_subscribeNewHeads` method that emits an event when new blocks are added to the blockchain:

<Tabs>
<TabItem value="request" label="Request">

```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"params": [],
"method": "starknet_subscribeNewHeads",
"id": 1
}
```
Expand All @@ -129,7 +128,7 @@ When a new block is added, you will receive a message like this:
```json
{
"jsonrpc": "2.0",
"method": "juno_subscribeNewHeads",
"method": "starknet_subscriptionNewHeads",
"params": {
"result": {
"block_hash": "0x840660a07a17ae6a55d39fb6d366698ecda11e02280ca3e9ca4b4f1bad741c",
Expand All @@ -149,7 +148,7 @@ When a new block is added, you will receive a message like this:
"l1_da_mode": "BLOB",
"starknet_version": "0.13.1.1"
},
"subscription": 16570962336122680234
"subscription_id": 16570962336122680234
}
}
```
Expand Down
23 changes: 19 additions & 4 deletions jsonrpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,17 @@ func isBatch(reader *bufio.Reader) bool {
return false
}

func isNil(i any) bool {
return i == nil || reflect.ValueOf(i).IsNil()
func isNilOrEmpty(i any) (bool, error) {
if utils.IsNil(i) {
return true, nil
}

switch reflect.TypeOf(i).Kind() {
case reflect.Slice, reflect.Array, reflect.Map:
return reflect.ValueOf(i).Len() == 0, nil
default:
return false, fmt.Errorf("impossible param type: check request.isSane")
}
}

func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, http.Header, error) {
Expand Down Expand Up @@ -471,7 +480,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
header = (tuple[1].Interface()).(http.Header)
}

if errAny := tuple[errorIndex].Interface(); !isNil(errAny) {
if errAny := tuple[errorIndex].Interface(); !utils.IsNil(errAny) {
res.Error = errAny.(*Error)
if res.Error.Code == InternalError {
s.listener.OnRequestFailed(req.Method, res.Error)
Expand All @@ -486,6 +495,7 @@ func (s *Server) handleRequest(ctx context.Context, req *Request) (*response, ht
return res, header, nil
}

//nolint:gocyclo
func (s *Server) buildArguments(ctx context.Context, params any, method Method) ([]reflect.Value, error) {
handlerType := reflect.TypeOf(method.Handler)

Expand All @@ -498,7 +508,12 @@ func (s *Server) buildArguments(ctx context.Context, params any, method Method)
addContext = 1
}

if isNil(params) {
isNilOrEmpty, err := isNilOrEmpty(params)
if err != nil {
return nil, err
}

if isNilOrEmpty {
allParamsAreOptional := utils.All(method.Params, func(p Parameter) bool {
return p.Optional
})
Expand Down
15 changes: 15 additions & 0 deletions jsonrpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ func TestHandle(t *testing.T) {
return 0, jsonrpc.Err(jsonrpc.InternalError, nil)
},
},
{
Name: "singleOptionalParam",
Params: []jsonrpc.Parameter{{Name: "param", Optional: true}},
Handler: func(param *int) (int, *jsonrpc.Error) {
return 0, nil
},
},
}

listener := CountingEventListener{}
Expand Down Expand Up @@ -475,6 +482,14 @@ func TestHandle(t *testing.T) {
res: `{"jsonrpc":"2.0","error":{"code":-32603,"message":"Internal error"},"id":1}`,
checkFailedEvent: true,
},
"empty optional param": {
req: `{"jsonrpc": "2.0", "method": "singleOptionalParam", "params": {}, "id": 1}`,
res: `{"jsonrpc":"2.0","result":0,"id":1}`,
},
"null optional param": {
req: `{"jsonrpc": "2.0", "method": "singleOptionalParam", "id": 1}`,
res: `{"jsonrpc":"2.0","result":0,"id":1}`,
},
}

for desc, test := range tests {
Expand Down
28 changes: 28 additions & 0 deletions mocks/mock_synchronizer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 0 additions & 68 deletions rpc/events.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package rpc

import (
"context"
"encoding/json"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/jsonrpc"
Expand Down Expand Up @@ -51,71 +48,6 @@ type SubscriptionID struct {
/****************************************************
Events Handlers
*****************************************************/

func (h *Handler) SubscribeNewHeads(ctx context.Context) (uint64, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return 0, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}

id := h.idgen()
subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx)
sub := &subscription{
cancel: subscriptionCtxCancel,
conn: w,
}
h.mu.Lock()
h.subscriptions[id] = sub
h.mu.Unlock()
headerSub := h.newHeads.Subscribe()
sub.wg.Go(func() {
defer func() {
headerSub.Unsubscribe()
h.unsubscribe(sub, id)
}()
for {
select {
case <-subscriptionCtx.Done():
return
case header := <-headerSub.Recv():
resp, err := json.Marshal(SubscriptionResponse{
Version: "2.0",
Method: "juno_subscribeNewHeads",
Params: map[string]any{
"result": adaptBlockHeader(header),
"subscription": id,
},
})
if err != nil {
h.log.Warnw("Error marshalling a subscription reply", "err", err)
return
}
if _, err = w.Write(resp); err != nil {
h.log.Warnw("Error writing a subscription reply", "err", err)
return
}
}
}
})
return id, nil
}

func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) {
w, ok := jsonrpc.ConnFromContext(ctx)
if !ok {
return false, jsonrpc.Err(jsonrpc.MethodNotFound, nil)
}
h.mu.Lock()
sub, ok := h.subscriptions[id]
h.mu.Unlock() // Don't defer since h.unsubscribe acquires the lock.
if !ok || !sub.conn.Equal(w) {
return false, ErrSubscriptionNotFound
}
sub.cancel()
sub.wg.Wait() // Let the subscription finish before responding.
return true, nil
}

// Events gets the events matching a filter
//
// It follows the specification defined here:
Expand Down
Loading

0 comments on commit ea728d9

Please sign in to comment.