Skip to content

Commit

Permalink
Merge pull request #480 from onflow/gregor/remove-restartable-engine
Browse files Browse the repository at this point in the history
Remove restartable engine
  • Loading branch information
sideninja authored Aug 27, 2024
2 parents 999d066 + e23e8ae commit 57cfe88
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 187 deletions.
6 changes: 2 additions & 4 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,19 +233,17 @@ func startIngestion(
logger,
collector,
)
const retries = 15
restartableEventEngine := models.NewRestartableEngine(eventEngine, retries, logger)

go func() {
err = restartableEventEngine.Run(ctx)
err = eventEngine.Run(ctx)
if err != nil {
logger.Error().Err(err).Msg("event ingestion engine failed to run")
panic(err)
}
}()

// wait for ingestion engines to be ready
<-restartableEventEngine.Ready()
<-eventEngine.Ready()

logger.Info().Msg("ingestion start up successful")
return nil
Expand Down
87 changes: 0 additions & 87 deletions models/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ package models

import (
"context"
"errors"
"fmt"
"time"

"github.com/rs/zerolog"

errs "github.com/onflow/flow-evm-gateway/models/errors"
)

// Engine defines a processing unit
Expand All @@ -23,86 +16,6 @@ type Engine interface {
Ready() <-chan struct{}
}

var _ Engine = &RestartableEngine{}

func NewRestartableEngine(engine Engine, retries uint, logger zerolog.Logger) *RestartableEngine {
// build a Fibonacci sequence, we could support more strategies in future
// we use 0 as first run shouldn't be delayed
backoff := []time.Duration{0, time.Second, time.Second}
for i := len(backoff); i < int(retries); i++ {
backoff = append(backoff, backoff[i-2]+backoff[i-1])
}
if int(retries) < len(backoff) {
backoff = backoff[0:retries]
}

logger = logger.With().Str("component", "restartable-engine").Logger()

return &RestartableEngine{
engine: engine,
backoff: backoff,
logger: logger,
}
}

// RestartableEngine is an engine wrapper that tries to restart
// the engine in case of starting errors.
//
// The strategy of the restarts contains Fibonacci backoff time and
// limited number of retries that can be configured.
// Here are backoff values for different retries provided:
// 1s 1s 2s 3s 5s 8s 13s 21s 34s 55s 1m29s 2m24s 3m53s 6m17s 10m10s 16m27s 26m37s 43m4s 1h9m41s
type RestartableEngine struct {
logger zerolog.Logger
engine Engine
backoff []time.Duration
}

func (r *RestartableEngine) Stop() {
r.engine.Stop()
}

func (r *RestartableEngine) Done() <-chan struct{} {
return r.engine.Done()
}

func (r *RestartableEngine) Ready() <-chan struct{} {
return r.engine.Ready()
}

func (r *RestartableEngine) Run(ctx context.Context) error {
var err error
for i, b := range r.backoff {
select {
case <-time.After(b): // wait for the backoff duration
if b > 0 {
r.logger.Warn().Msg("restarting the engine now")
}
case <-ctx.Done():
// todo should we return the error if context is canceled?
r.logger.Warn().Msg("context cancelled, stopping the engine")
return ctx.Err()
}

err = r.engine.Run(ctx)
if err == nil {
// don't restart if no error is returned, normal after stop procedure is done
return nil
}
if !errors.Is(err, errs.ErrRecoverable) {
r.logger.Error().Err(err).Msg("received unrecoverable error")
// if error is not recoverable just die
return err
}

r.logger.Error().Err(err).Msg(fmt.Sprintf("received recoverable error, restarting for the %d time after backoff time", i))
}

r.logger.Error().Msg("failed to recover and restart the engine, stop retrying")
// if after retries we still get an error it's time to stop
return err
}

type EngineStatus struct {
done chan struct{}
ready chan struct{}
Expand Down
96 changes: 0 additions & 96 deletions models/engine_test.go

This file was deleted.

0 comments on commit 57cfe88

Please sign in to comment.