Skip to content

Commit

Permalink
Implement draft idea how reorg should be restructured
Browse files Browse the repository at this point in the history
  • Loading branch information
kirugan committed Apr 10, 2024
1 parent 4d43a8a commit d6d7607
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
28 changes: 26 additions & 2 deletions clients/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"github.com/stretchr/testify/require"
)

var ErrDeprecatedCompiledClass = errors.New("deprecated compiled class")
var (
ErrDeprecatedCompiledClass = errors.New("deprecated compiled class")
ErrBlockNotFound = errors.New("block not found")
)

type Backoff func(wait time.Duration) time.Duration

Expand Down Expand Up @@ -254,7 +257,12 @@ func (c *Client) get(ctx context.Context, queryURL string) (io.ReadCloser, error
if res.StatusCode == http.StatusOK {
return res.Body, nil
} else {
err = errors.New(res.Status)
if c.isBlockNotFoundError(res) {
err = ErrBlockNotFound
break

Check warning on line 262 in clients/feeder/feeder.go

View check run for this annotation

Codecov / codecov/patch

clients/feeder/feeder.go#L261-L262

Added lines #L261 - L262 were not covered by tests
} else {
err = errors.New(res.Status)
}
}

res.Body.Close()
Expand All @@ -270,9 +278,25 @@ func (c *Client) get(ctx context.Context, queryURL string) (io.ReadCloser, error
c.log.Debugw("Failed query to feeder, retrying...", "req", req.URL.String(), "retryAfter", wait.String(), "err", err)
}
}

return nil, err
}

func (c *Client) isBlockNotFoundError(res *http.Response) bool {
var starknetError struct {
Code string `json:"code"`
Message string `json:"message"` // not used for now
}

err := json.NewDecoder(res.Body).Decode(&starknetError)
if err != nil {
c.log.Errorw("Failed to read response body", "err", err)
return false
}

return starknetError.Code == "StarknetErrorCode.BLOCK_NOT_FOUND"

Check warning on line 297 in clients/feeder/feeder.go

View check run for this annotation

Codecov / codecov/patch

clients/feeder/feeder.go#L297

Added line #L297 was not covered by tests
}

func (c *Client) StateUpdate(ctx context.Context, blockID string) (*starknet.StateUpdate, error) {
queryURL := c.buildQueryString("get_state_update", map[string]string{
"blockNumber": blockID,
Expand Down
28 changes: 23 additions & 5 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync/atomic"
"time"

"github.com/NethermindEth/juno/clients/feeder"

Check failure on line 10 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)

"github.com/NethermindEth/juno/blockchain"

Check failure on line 12 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default (gci)
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
Expand Down Expand Up @@ -101,16 +103,23 @@ func (s *Synchronizer) Run(ctx context.Context) error {
return nil
}

func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream,
resetStreams context.CancelFunc,
) stream.Callback {
func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers *stream.Stream, resetStreams context.CancelFunc, deepReorg chan struct{}) stream.Callback {

Check failure on line 106 in sync/sync.go

View workflow job for this annotation

GitHub Actions / lint

line is 172 characters (lll)
emptyCallback := func() {}

for {
select {
case <-ctx.Done():
return func() {}
return emptyCallback
default:
stateUpdate, block, err := s.starknetData.StateUpdateWithBlock(ctx, height)
if err != nil {
// in case block not found we should initiate deep reorg
if errors.Is(err, feeder.ErrBlockNotFound) {
resetStreams()
deepReorg <- struct{}{}
return emptyCallback

Check warning on line 120 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L118-L120

Added lines #L118 - L120 were not covered by tests
}

continue
}

Expand Down Expand Up @@ -266,6 +275,8 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
pendingSem := make(chan struct{}, 1)
go s.pollPending(syncCtx, pendingSem)

// rough marker that deep reorg in progress, todo replace with smth. else
deepReorg := make(chan struct{}, 1)
for {
select {
case <-streamCtx.Done():
Expand All @@ -278,6 +289,13 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
pendingSem <- struct{}{}
latestSem <- struct{}{}
return
case <-deepReorg:

Check warning on line 292 in sync/sync.go

View check run for this annotation

Codecov / codecov/patch

sync/sync.go#L292

Added line #L292 was not covered by tests
// 1. fetch current height from feeder by using block=latest
// 2. revert all blocks till the block where it diverged
// 3. mark deep reorg as done
// 4. restart syncing process
// to discuss: do we need to remove "small" reorg in verifyier logic ?
// potential speedup: set catchUpMode before restarting sync
default:
streamCtx, streamCancel = context.WithCancel(syncCtx)
nextHeight = s.nextHeight()
Expand All @@ -288,7 +306,7 @@ func (s *Synchronizer) syncBlocks(syncCtx context.Context) {
curHeight, curStreamCtx, curCancel := nextHeight, streamCtx, streamCancel
fetchers.Go(func() stream.Callback {
fetchTimer := time.Now()
cb := s.fetcherTask(curStreamCtx, curHeight, verifiers, curCancel)
cb := s.fetcherTask(curStreamCtx, curHeight, verifiers, curCancel, deepReorg)
s.listener.OnSyncStepDone(OpFetch, curHeight, time.Since(fetchTimer))
return cb
})
Expand Down

0 comments on commit d6d7607

Please sign in to comment.