Skip to content

Commit b8a2459

Browse files
authored
refactor: Separate storeTask from verifierTask (#3099)
1 parent 92b20ed commit b8a2459

File tree

1 file changed

+101
-76
lines changed

1 file changed

+101
-76
lines changed

sync/sync.go

Lines changed: 101 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,7 @@ func (s *Synchronizer) fetcherTask(ctx context.Context, height uint64, verifiers
201201

202202
return func() {
203203
verifiers.Go(func() stream.Callback {
204-
return s.verifierTask(
205-
ctx,
206-
committedBlock.Block,
207-
committedBlock.StateUpdate,
208-
committedBlock.NewClasses,
209-
resetStreams,
210-
committedBlock.Persisted,
211-
)
204+
return s.verifierTask(ctx, &committedBlock, resetStreams)
212205
})
213206
}
214207
}
@@ -258,90 +251,122 @@ func (s *Synchronizer) handlePluginRevertBlock() {
258251
}
259252
}
260253

261-
//nolint:gocyclo
262-
func (s *Synchronizer) verifierTask(ctx context.Context, block *core.Block, stateUpdate *core.StateUpdate,
263-
newClasses map[felt.Felt]core.Class, resetStreams context.CancelFunc, persisted chan struct{},
254+
func (s *Synchronizer) verifierTask(
255+
ctx context.Context,
256+
committedBlock *CommittedBlock,
257+
resetStreams context.CancelFunc,
264258
) stream.Callback {
265259
verifyTimer := time.Now()
266-
commitments, err := s.blockchain.SanityCheckNewHeight(block, stateUpdate, newClasses)
267-
if err == nil {
268-
s.listener.OnSyncStepDone(OpVerify, block.Number, time.Since(verifyTimer))
260+
commitments, err := s.blockchain.SanityCheckNewHeight(
261+
committedBlock.Block,
262+
committedBlock.StateUpdate,
263+
committedBlock.NewClasses,
264+
)
265+
if err != nil {
266+
return func() {
267+
defer close(committedBlock.Persisted)
268+
s.log.Warnw(
269+
"Sanity checks failed",
270+
"number",
271+
committedBlock.Block.Number,
272+
"hash",
273+
committedBlock.Block.Hash.ShortString(),
274+
"err",
275+
err,
276+
)
277+
resetStreams()
278+
}
269279
}
280+
281+
s.listener.OnSyncStepDone(OpVerify, committedBlock.Block.Number, time.Since(verifyTimer))
270282
return func() {
271-
select {
272-
case <-ctx.Done():
273-
return
274-
default:
275-
defer close(persisted)
283+
s.storeTask(ctx, committedBlock, resetStreams, commitments)
284+
}
285+
}
276286

277-
if err != nil {
278-
s.log.Warnw("Sanity checks failed", "number", block.Number, "hash", block.Hash.ShortString(), "err", err)
279-
resetStreams()
280-
return
281-
}
282-
storeTimer := time.Now()
283-
err = s.blockchain.Store(block, commitments, stateUpdate, newClasses)
284-
if err != nil {
285-
if errors.Is(err, blockchain.ErrParentDoesNotMatchHead) {
286-
// revert the head and restart the sync process, hoping that the reorg is not deep
287-
// if the reorg is deeper, we will end up here again and again until we fully revert reorged
288-
// blocks
289-
if s.plugin != nil {
290-
s.handlePluginRevertBlock()
291-
}
292-
s.revertHead(block)
287+
func (s *Synchronizer) storeTask(
288+
ctx context.Context,
289+
committedBlock *CommittedBlock,
290+
resetStreams context.CancelFunc,
291+
commitments *core.BlockCommitments,
292+
) {
293+
defer close(committedBlock.Persisted)
294+
select {
295+
case <-ctx.Done():
296+
return
297+
default:
298+
}
293299

294-
// The previous head has been reverted, hence, get the current head and store empty pending block
295-
head, err := s.blockchain.HeadsHeader()
296-
if err != nil {
297-
s.log.Errorw("Failed to retrieve the head header", "err", err)
298-
}
300+
storeTimer := time.Now()
301+
block := committedBlock.Block
302+
stateUpdate := committedBlock.StateUpdate
303+
newClasses := committedBlock.NewClasses
304+
if err := s.blockchain.Store(block, commitments, stateUpdate, newClasses); err != nil {
305+
if errors.Is(err, blockchain.ErrParentDoesNotMatchHead) {
306+
s.revertTask(block, resetStreams)
307+
return
308+
}
299309

300-
if head != nil {
301-
s.storeEmptyPendingData(head)
302-
}
303-
} else {
304-
s.log.Warnw("Failed storing Block", "number", block.Number,
305-
"hash", block.Hash.ShortString(), "err", err)
306-
}
307-
resetStreams()
308-
return
309-
}
310+
s.log.Warnw("Failed storing Block", "number", block.Number,
311+
"hash", block.Hash.ShortString(), "err", err)
312+
resetStreams()
313+
return
314+
}
310315

311-
s.storeEmptyPendingData(block.Header)
312-
s.listener.OnSyncStepDone(OpStore, block.Number, time.Since(storeTimer))
316+
s.storeEmptyPendingData(block.Header)
317+
s.listener.OnSyncStepDone(OpStore, block.Number, time.Since(storeTimer))
313318

314-
highestBlockHeader := s.highestBlockHeader.Load()
315-
if highestBlockHeader != nil {
316-
isBehind := highestBlockHeader.Number > block.Number+uint64(maxWorkers())
317-
if s.catchUpMode != isBehind {
318-
resetStreams()
319-
}
320-
s.catchUpMode = isBehind
321-
}
319+
highestBlockHeader := s.highestBlockHeader.Load()
320+
if highestBlockHeader != nil {
321+
isBehind := highestBlockHeader.Number > block.Number+uint64(maxWorkers())
322+
if s.catchUpMode != isBehind {
323+
resetStreams()
324+
s.catchUpMode = isBehind
325+
}
326+
}
322327

323-
if highestBlockHeader == nil || highestBlockHeader.Number < block.Number {
324-
s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header)
325-
}
328+
if highestBlockHeader == nil || highestBlockHeader.Number < block.Number {
329+
s.highestBlockHeader.CompareAndSwap(highestBlockHeader, block.Header)
330+
}
326331

327-
if s.currReorg != nil {
328-
s.reorgFeed.Send(s.currReorg)
329-
s.currReorg = nil // reset the reorg data
330-
}
332+
if s.currReorg != nil {
333+
s.reorgFeed.Send(s.currReorg)
334+
s.currReorg = nil // reset the reorg data
335+
}
331336

332-
s.newHeads.Send(block)
333-
s.log.Infow("Stored Block", "number", block.Number, "hash",
334-
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
335-
if s.plugin != nil {
336-
err := s.plugin.NewBlock(block, stateUpdate, newClasses)
337-
if err != nil {
338-
s.log.Errorw("Plugin NewBlock failure:", err)
339-
}
340-
}
337+
s.newHeads.Send(block)
338+
s.log.Infow("Stored Block", "number", block.Number, "hash",
339+
block.Hash.ShortString(), "root", block.GlobalStateRoot.ShortString())
340+
if s.plugin != nil {
341+
err := s.plugin.NewBlock(block, stateUpdate, newClasses)
342+
if err != nil {
343+
s.log.Errorw("Plugin NewBlock failure:", err)
341344
}
342345
}
343346
}
344347

348+
func (s *Synchronizer) revertTask(forkedBlock *core.Block, resetStreams context.CancelFunc) {
349+
// revert the head and restart the sync process, hoping that the reorg is not deep
350+
// if the reorg is deeper, we will end up here again and again until we fully revert reorged
351+
// blocks
352+
if s.plugin != nil {
353+
s.handlePluginRevertBlock()
354+
}
355+
s.revertHead(forkedBlock)
356+
357+
// The previous head has been reverted, hence, get the current head and store empty pending block
358+
head, err := s.blockchain.HeadsHeader()
359+
if err != nil {
360+
s.log.Errorw("Failed to retrieve the head header", "err", err)
361+
}
362+
363+
if head != nil {
364+
s.storeEmptyPendingData(head)
365+
}
366+
367+
resetStreams()
368+
}
369+
345370
func (s *Synchronizer) nextHeight() uint64 {
346371
nextHeight := uint64(0)
347372
if h, err := s.blockchain.Height(); err == nil {

0 commit comments

Comments
 (0)