Skip to content

Commit f01671a

Browse files
committed
Implement starknet_subscriptionReorg
1 parent 31d1bb2 commit f01671a

File tree

6 files changed

+193
-12
lines changed

6 files changed

+193
-12
lines changed

mocks/mock_synchronizer.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rpc/events.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/NethermindEth/juno/core/felt"
1010
"github.com/NethermindEth/juno/feed"
1111
"github.com/NethermindEth/juno/jsonrpc"
12+
"github.com/NethermindEth/juno/sync"
13+
"github.com/sourcegraph/conc"
1214
)
1315

1416
const (
@@ -80,15 +82,18 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
8082
h.mu.Unlock()
8183

8284
headerSub := h.newHeads.Subscribe()
85+
reorgSub := h.reorgs.Subscribe() // as per the spec, reorgs are also sent in the new heads subscription
8386
sub.wg.Go(func() {
8487
defer func() {
8588
h.unsubscribe(sub, id)
8689
headerSub.Unsubscribe()
90+
reorgSub.Unsubscribe()
8791
}()
8892

89-
newHeadersChan := make(chan *core.Header, MaxBlocksBack)
93+
var wg conc.WaitGroup
9094

91-
sub.wg.Go(func() {
95+
newHeadersChan := make(chan *core.Header, MaxBlocksBack)
96+
wg.Go(func() {
9297
h.bufferNewHeaders(subscriptionCtx, headerSub, newHeadersChan)
9398
})
9499

@@ -97,7 +102,15 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub
97102
return
98103
}
99104

100-
h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id)
105+
wg.Go(func() {
106+
h.processNewHeaders(subscriptionCtx, newHeadersChan, w, id)
107+
})
108+
109+
wg.Go(func() {
110+
h.processReorgs(subscriptionCtx, reorgSub, w, id)
111+
})
112+
113+
wg.Wait()
101114
})
102115

103116
return &SubscriptionID{ID: id}, nil
@@ -204,6 +217,36 @@ func (h *Handler) sendHeader(w jsonrpc.Conn, header *core.Header, id uint64) err
204217
return err
205218
}
206219

220+
func (h *Handler) processReorgs(ctx context.Context, reorgSub *feed.Subscription[*sync.ReorgData], w jsonrpc.Conn, id uint64) {
221+
for {
222+
select {
223+
case <-ctx.Done():
224+
return
225+
case reorg := <-reorgSub.Recv():
226+
if err := h.sendReorg(w, reorg, id); err != nil {
227+
h.log.Warnw("Error sending reorg", "err", err)
228+
return
229+
}
230+
}
231+
}
232+
}
233+
234+
func (h *Handler) sendReorg(w jsonrpc.Conn, reorg *sync.ReorgData, id uint64) error {
235+
resp, err := json.Marshal(jsonrpc.Request{
236+
Version: "2.0",
237+
Method: "starknet_subscriptionReorg",
238+
Params: map[string]any{
239+
"subscription_id": id,
240+
"result": reorg,
241+
},
242+
})
243+
if err != nil {
244+
return err
245+
}
246+
_, err = w.Write(resp)
247+
return err
248+
}
249+
207250
func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Error) {
208251
w, ok := jsonrpc.ConnFromContext(ctx)
209252
if !ok {

rpc/events_test.go

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
var emptyCommitments = core.BlockCommitments{}
2929

3030
const (
31-
testResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
31+
newHeadsResponse = `{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`
3232
)
3333

3434
func TestEvents(t *testing.T) {
@@ -238,12 +238,24 @@ func (fc *fakeConn) Equal(other jsonrpc.Conn) bool {
238238

239239
type fakeSyncer struct {
240240
newHeads *feed.Feed[*core.Header]
241+
reorgs *feed.Feed[*sync.ReorgData]
242+
}
243+
244+
func newFakeSyncer() *fakeSyncer {
245+
return &fakeSyncer{
246+
newHeads: feed.New[*core.Header](),
247+
reorgs: feed.New[*sync.ReorgData](),
248+
}
241249
}
242250

243251
func (fs *fakeSyncer) SubscribeNewHeads() sync.HeaderSubscription {
244252
return sync.HeaderSubscription{Subscription: fs.newHeads.Subscribe()}
245253
}
246254

255+
func (fs *fakeSyncer) SubscribeReorg() sync.ReorgSubscription {
256+
return sync.ReorgSubscription{Subscription: fs.reorgs.Subscribe()}
257+
}
258+
247259
func (fs *fakeSyncer) StartingBlockNumber() (uint64, error) {
248260
return 0, nil
249261
}
@@ -256,7 +268,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
256268
t.Parallel()
257269

258270
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
259-
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
271+
syncer := newFakeSyncer()
260272
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())
261273

262274
ctx, cancel := context.WithCancel(context.Background())
@@ -289,7 +301,7 @@ func TestSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
289301
syncer.newHeads.Send(testHeader(t))
290302

291303
// Receive a block header.
292-
want := fmt.Sprintf(testResponse, id.ID)
304+
want := fmt.Sprintf(newHeadsResponse, id.ID)
293305
got := make([]byte, len(want))
294306
_, err := clientConn.Read(got)
295307
require.NoError(t, err)
@@ -323,7 +335,7 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
323335

324336
log := utils.NewNopZapLogger()
325337
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
326-
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
338+
syncer := newFakeSyncer()
327339
handler := rpc.New(chain, syncer, nil, "", log)
328340

329341
ctx, cancel := context.WithCancel(context.Background())
@@ -377,11 +389,11 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) {
377389
syncer.newHeads.Send(testHeader(t))
378390

379391
// Receive a block header.
380-
firstWant = fmt.Sprintf(testResponse, firstID)
392+
firstWant = fmt.Sprintf(newHeadsResponse, firstID)
381393
_, firstGot, err = conn1.Read(ctx)
382394
require.NoError(t, err)
383395
require.Equal(t, firstWant, string(firstGot))
384-
secondWant = fmt.Sprintf(testResponse, secondID)
396+
secondWant = fmt.Sprintf(newHeadsResponse, secondID)
385397
_, secondGot, err = conn2.Read(ctx)
386398
require.NoError(t, err)
387399
require.Equal(t, secondWant, string(secondGot))
@@ -407,7 +419,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
407419
assert.NoError(t, chain.Store(block0, &emptyCommitments, stateUpdate0, nil))
408420

409421
chain = blockchain.New(testDB, &utils.Mainnet)
410-
syncer := &fakeSyncer{newHeads: feed.New[*core.Header]()}
422+
syncer := newFakeSyncer()
411423
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())
412424

413425
ctx, cancel := context.WithCancel(context.Background())
@@ -450,7 +462,7 @@ func TestSubscribeNewHeadsHistorical(t *testing.T) {
450462
syncer.newHeads.Send(testHeader(t))
451463

452464
// Check new block content
453-
want = fmt.Sprintf(testResponse, id.ID)
465+
want = fmt.Sprintf(newHeadsResponse, id.ID)
454466
got = make([]byte, len(want))
455467
_, err = clientConn.Read(got)
456468
require.NoError(t, err)
@@ -478,3 +490,48 @@ func testHeader(t *testing.T) *core.Header {
478490
}
479491
return header
480492
}
493+
494+
func TestSubscriptionReorg(t *testing.T) {
495+
t.Parallel()
496+
497+
chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet)
498+
syncer := newFakeSyncer()
499+
handler := rpc.New(chain, syncer, nil, "", utils.NewNopZapLogger())
500+
501+
ctx, cancel := context.WithCancel(context.Background())
502+
t.Cleanup(cancel)
503+
504+
go func() {
505+
require.NoError(t, handler.Run(ctx))
506+
}()
507+
time.Sleep(50 * time.Millisecond)
508+
509+
serverConn, clientConn := net.Pipe()
510+
t.Cleanup(func() {
511+
require.NoError(t, serverConn.Close())
512+
require.NoError(t, clientConn.Close())
513+
})
514+
515+
subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn})
516+
517+
// Subscribe to new heads which will send a
518+
id, rpcErr := handler.SubscribeNewHeads(subCtx, nil)
519+
require.Nil(t, rpcErr)
520+
require.NotZero(t, id)
521+
522+
// Simulate a reorg
523+
syncer.reorgs.Send(&sync.ReorgData{
524+
StartBlockHash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"),
525+
StartBlockNum: 0,
526+
EndBlockHash: utils.HexToFelt(t, "0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86"),
527+
EndBlockNum: 2,
528+
})
529+
530+
// Receive reorg event
531+
want := `{"jsonrpc":"2.0","method":"starknet_subscriptionReorg","params":{"result":{"starting_block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","starting_block_number":0,"ending_block_hash":"0x34e815552e42c5eb5233b99de2d3d7fd396e575df2719bf98e7ed2794494f86","ending_block_number":2},"subscription_id":%d}}`
532+
want = fmt.Sprintf(want, id.ID)
533+
got := make([]byte, len(want))
534+
_, err := clientConn.Read(got)
535+
require.NoError(t, err)
536+
require.Equal(t, want, string(got))
537+
}

rpc/handlers.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type Handler struct {
8484

8585
version string
8686
newHeads *feed.Feed[*core.Header]
87+
reorgs *feed.Feed[*sync.ReorgData]
8788

8889
idgen func() uint64
8990
mu stdsync.Mutex // protects subscriptions.
@@ -117,6 +118,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V
117118
},
118119
version: version,
119120
newHeads: feed.New[*core.Header](),
121+
reorgs: feed.New[*sync.ReorgData](),
120122
subscriptions: make(map[uint64]*subscription),
121123

122124
blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize),
@@ -152,8 +154,12 @@ func (h *Handler) WithGateway(gatewayClient Gateway) *Handler {
152154

153155
func (h *Handler) Run(ctx context.Context) error {
154156
newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription
157+
reorgsSub := h.syncReader.SubscribeReorg().Subscription
155158
defer newHeadsSub.Unsubscribe()
156-
feed.Tee[*core.Header](newHeadsSub, h.newHeads)
159+
defer reorgsSub.Unsubscribe()
160+
feed.Tee(newHeadsSub, h.newHeads)
161+
feed.Tee(reorgsSub, h.reorgs)
162+
157163
<-ctx.Done()
158164
for _, sub := range h.subscriptions {
159165
sub.wg.Wait()

sync/sync.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,18 @@ type HeaderSubscription struct {
3434
*feed.Subscription[*core.Header]
3535
}
3636

37+
type ReorgSubscription struct {
38+
*feed.Subscription[*ReorgData]
39+
}
40+
3741
// Todo: Since this is also going to be implemented by p2p package we should move this interface to node package
3842
//
3943
//go:generate mockgen -destination=../mocks/mock_synchronizer.go -package=mocks -mock_names Reader=MockSyncReader github.com/NethermindEth/juno/sync Reader
4044
type Reader interface {
4145
StartingBlockNumber() (uint64, error)
4246
HighestBlockHeader() *core.Header
4347
SubscribeNewHeads() HeaderSubscription
48+
SubscribeReorg() ReorgSubscription
4449
}
4550

4651
// This is temporary and will be removed once the p2p synchronizer implements this interface.
@@ -58,6 +63,22 @@ func (n *NoopSynchronizer) SubscribeNewHeads() HeaderSubscription {
5863
return HeaderSubscription{feed.New[*core.Header]().Subscribe()}
5964
}
6065

66+
func (n *NoopSynchronizer) SubscribeReorg() ReorgSubscription {
67+
return ReorgSubscription{feed.New[*ReorgData]().Subscribe()}
68+
}
69+
70+
// ReorgData represents data about reorganised blocks, starting and ending block number and hash
71+
type ReorgData struct {
72+
// StartBlockHash is the hash of the first known block of the orphaned chain
73+
StartBlockHash *felt.Felt `json:"starting_block_hash"`
74+
// StartBlockNum is the number of the first known block of the orphaned chain
75+
StartBlockNum uint64 `json:"starting_block_number"`
76+
// The last known block of the orphaned chain
77+
EndBlockHash *felt.Felt `json:"ending_block_hash"`
78+
// Number of the last known block of the orphaned chain
79+
EndBlockNum uint64 `json:"ending_block_number"`
80+
}
81+
6182
// Synchronizer manages a list of StarknetData to fetch the latest blockchain updates
6283
type Synchronizer struct {
6384
blockchain *blockchain.Blockchain
@@ -66,12 +87,15 @@ type Synchronizer struct {
6687
startingBlockNumber *uint64
6788
highestBlockHeader atomic.Pointer[core.Header]
6889
newHeads *feed.Feed[*core.Header]
90+
reorgFeed *feed.Feed[*ReorgData]
6991

7092
log utils.SimpleLogger
7193
listener EventListener
7294

7395
pendingPollInterval time.Duration
7496
catchUpMode bool
97+
98+
currReorg *ReorgData // If nil, no reorg is happening
7599
}
76100

77101
func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
@@ -82,6 +106,7 @@ func New(bc *blockchain.Blockchain, starkNetData starknetdata.StarknetData,
82106
starknetData: starkNetData,
83107
log: log,
84108
newHeads: feed.New[*core.Header](),
109+
reorgFeed: feed.New[*ReorgData](),
85110
pendingPollInterval: pendingPollInterval,
86111
listener: &SelectiveListener{},
87112
readOnlyBlockchain: readOnlyBlockchain,
@@ -228,6 +253,11 @@ func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stat
228253
s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header)
229254
}
230255

256+
if s.currReorg != nil {
257+
s.reorgFeed.Send(s.currReorg)
258+
s.currReorg = nil // reset the reorg data
259+
}
260+
231261
s.newHeads.Send(block.Header)
232262
s.log.Infow("Stored Block", "number", block.Number, "hash",
233263
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
@@ -324,6 +354,19 @@ func (s *Synchronizer) revertHead(forkBlock *core.Block) {
324354
} else {
325355
s.log.Infow("Reverted HEAD", "reverted", localHead)
326356
}
357+
358+
if s.currReorg == nil { // first block of the reorg
359+
s.currReorg = &ReorgData{
360+
StartBlockHash: localHead,
361+
StartBlockNum: head.Number,
362+
EndBlockHash: localHead,
363+
EndBlockNum: head.Number,
364+
}
365+
} else { // not the first block of the reorg, adjust the starting block
366+
s.currReorg.StartBlockHash = localHead
367+
s.currReorg.StartBlockNum = head.Number
368+
}
369+
327370
s.listener.OnReorg(head.Number)
328371
}
329372

@@ -439,3 +482,9 @@ func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription {
439482
Subscription: s.newHeads.Subscribe(),
440483
}
441484
}
485+
486+
func (s *Synchronizer) SubscribeReorg() ReorgSubscription {
487+
return ReorgSubscription{
488+
Subscription: s.reorgFeed.Subscribe(),
489+
}
490+
}

0 commit comments

Comments
 (0)