Skip to content

Commit ee6eb39

Browse files
authored
feat: add /live and /ready endpoints with support during DB migration (#2923)
- Add /live and /ready endpoints for Kubernetes liveness and readiness probes - Enable HTTP status server during database migrations to serve health checks - Include descriptive response messages for migration and sync status This allows Kubernetes to monitor node health even during database migrations, preventing unnecessary pod restarts during migration processes.
1 parent dfe7d63 commit ee6eb39

File tree

5 files changed

+81
-11
lines changed

5 files changed

+81
-11
lines changed

migration/migration.go

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ import (
88
"errors"
99
"fmt"
1010
"maps"
11+
"net"
12+
"net/http"
1113
"runtime"
14+
"strconv"
1215
"sync"
16+
"time"
1317

1418
"github.com/NethermindEth/juno/adapters/sn2core"
1519
"github.com/NethermindEth/juno/core"
@@ -29,6 +33,12 @@ type schemaMetadata struct {
2933
IntermediateState []byte
3034
}
3135

36+
type HTTPConfig struct {
37+
Enabled bool
38+
Host string
39+
Port uint16
40+
}
41+
3242
type Migration interface {
3343
Before(intermediateState []byte) error
3444
// Migration should return intermediate state whenever it requests new txn or detects cancelled ctx.
@@ -76,8 +86,10 @@ var defaultMigrations = []Migration{
7686

7787
var ErrCallWithNewTransaction = errors.New("call with new transaction")
7888

79-
func MigrateIfNeeded(ctx context.Context, targetDB db.KeyValueStore, network *utils.Network, log utils.SimpleLogger) error {
80-
return migrateIfNeeded(ctx, targetDB, network, log, defaultMigrations)
89+
func MigrateIfNeeded(ctx context.Context, targetDB db.KeyValueStore, network *utils.Network,
90+
log utils.SimpleLogger, httpConfig *HTTPConfig,
91+
) error {
92+
return migrateIfNeeded(ctx, targetDB, network, log, defaultMigrations, httpConfig)
8193
}
8294

8395
func migrateIfNeeded(
@@ -86,6 +98,7 @@ func migrateIfNeeded(
8698
network *utils.Network,
8799
log utils.SimpleLogger,
88100
migrations []Migration,
101+
httpConfig *HTTPConfig,
89102
) error {
90103
/*
91104
Schema metadata of the targetDB determines which set of migrations need to be applied to the database.
@@ -113,6 +126,11 @@ func migrateIfNeeded(
113126
return errors.New("db is from a newer, incompatible version of Juno; upgrade to use this database")
114127
}
115128

129+
if httpConfig.Enabled {
130+
migrationSrv := startMigrationStatusServer(log, httpConfig.Host, httpConfig.Port)
131+
defer closeMigrationServer(migrationSrv, log)
132+
}
133+
116134
for i := metadata.Version; i < currentVersion; i++ {
117135
if err = ctx.Err(); err != nil {
118136
return err
@@ -155,6 +173,13 @@ func migrateIfNeeded(
155173
return nil
156174
}
157175

176+
func closeMigrationServer(srv *http.Server, log utils.SimpleLogger) {
177+
log.Debugw("Closing migration status server immediately...")
178+
if err := srv.Close(); err != nil {
179+
log.Errorw("Migration status server close failed", "err", err)
180+
}
181+
}
182+
158183
// SchemaMetadata retrieves metadata about a database schema from the given database.
159184
func SchemaMetadata(targetDB db.KeyValueStore) (schemaMetadata, error) {
160185
metadata := schemaMetadata{}
@@ -869,3 +894,35 @@ func reconstructAggregatedBloomFilters(txn db.IndexedBatch, network *utils.Netwo
869894

870895
return core.WriteRunningEventFilter(txn, runningFilter)
871896
}
897+
898+
func startMigrationStatusServer(log utils.SimpleLogger, host string, port uint16) *http.Server {
899+
mux := http.NewServeMux()
900+
901+
mux.HandleFunc("/live", func(w http.ResponseWriter, r *http.Request) {
902+
w.WriteHeader(http.StatusOK)
903+
})
904+
905+
mux.HandleFunc("/ready", func(w http.ResponseWriter, r *http.Request) {
906+
w.WriteHeader(http.StatusServiceUnavailable)
907+
_, err := w.Write([]byte("Database migration in progress."))
908+
if err != nil {
909+
log.Errorw("Failed to write migration status response", "err", err)
910+
}
911+
})
912+
913+
portStr := strconv.FormatUint(uint64(port), 10)
914+
addr := net.JoinHostPort(host, portStr)
915+
srv := &http.Server{
916+
Addr: addr,
917+
Handler: mux,
918+
ReadHeaderTimeout: 30 * time.Second,
919+
}
920+
921+
go func() {
922+
log.Debugw("Starting migration status server on " + addr)
923+
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
924+
log.Errorw("Migration status server failed", "err", err)
925+
}
926+
}()
927+
return srv
928+
}

migration/migration_pkg_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ func TestMigrateIfNeeded(t *testing.T) {
502502
},
503503
},
504504
}
505-
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations), "bar")
505+
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations, &HTTPConfig{}), "bar")
506506
})
507507

508508
t.Run("call with new tx", func(t *testing.T) {
@@ -522,7 +522,7 @@ func TestMigrateIfNeeded(t *testing.T) {
522522
},
523523
},
524524
}
525-
require.NoError(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations))
525+
require.NoError(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations, &HTTPConfig{}))
526526
})
527527

528528
t.Run("error during migration", func(t *testing.T) {
@@ -537,7 +537,7 @@ func TestMigrateIfNeeded(t *testing.T) {
537537
},
538538
},
539539
}
540-
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations), "foo")
540+
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations, &HTTPConfig{}), "foo")
541541
})
542542

543543
t.Run("error if using new db on old version of juno", func(t *testing.T) {
@@ -552,9 +552,9 @@ func TestMigrateIfNeeded(t *testing.T) {
552552
},
553553
},
554554
}
555-
require.NoError(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations))
555+
require.NoError(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), migrations, &HTTPConfig{}))
556556
want := "db is from a newer, incompatible version of Juno"
557-
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), []Migration{}), want)
557+
require.ErrorContains(t, migrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), []Migration{}, &HTTPConfig{}), want)
558558
})
559559
}
560560

migration/migration_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestMigrateIfNeeded(t *testing.T) {
1616
ctx, cancel := context.WithCancel(t.Context())
1717
cancel()
1818
t.Run("Migration should not happen on cancelled ctx", func(t *testing.T) {
19-
require.ErrorIs(t, migration.MigrateIfNeeded(ctx, testDB, &utils.Mainnet, utils.NewNopZapLogger()), ctx.Err())
19+
require.ErrorIs(t, migration.MigrateIfNeeded(ctx, testDB, &utils.Mainnet, utils.NewNopZapLogger(), &migration.HTTPConfig{}), ctx.Err())
2020
})
2121

2222
meta, err := migration.SchemaMetadata(testDB)
@@ -25,7 +25,7 @@ func TestMigrateIfNeeded(t *testing.T) {
2525
require.Nil(t, meta.IntermediateState)
2626

2727
t.Run("Migration should happen on empty DB", func(t *testing.T) {
28-
require.NoError(t, migration.MigrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger()))
28+
require.NoError(t, migration.MigrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), &migration.HTTPConfig{}))
2929
})
3030

3131
meta, err = migration.SchemaMetadata(testDB)
@@ -34,7 +34,7 @@ func TestMigrateIfNeeded(t *testing.T) {
3434
require.Nil(t, meta.IntermediateState)
3535

3636
t.Run("subsequent calls to MigrateIfNeeded should not change the DB version", func(t *testing.T) {
37-
require.NoError(t, migration.MigrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger()))
37+
require.NoError(t, migration.MigrateIfNeeded(t.Context(), testDB, &utils.Mainnet, utils.NewNopZapLogger(), &migration.HTTPConfig{}))
3838
postVersion, postErr := migration.SchemaMetadata(testDB)
3939
require.NoError(t, postErr)
4040
require.Equal(t, meta, postVersion)

node/http.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ func NewReadinessHandlers(bcReader blockchain.Reader, syncReader sync.Reader) *r
239239
func (h *readinessHandlers) HandleReadySync(w http.ResponseWriter, r *http.Request) {
240240
if !h.isSynced() {
241241
w.WriteHeader(http.StatusServiceUnavailable)
242+
w.Write([]byte("Node not synced yet.")) //nolint:errcheck
242243
return
243244
}
244245

@@ -261,3 +262,7 @@ func (h *readinessHandlers) isSynced() bool {
261262

262263
return head.Number+SyncBlockRange >= highestBlockHeader.Number
263264
}
265+
266+
func (h *readinessHandlers) HandleLive(w http.ResponseWriter, r *http.Request) {
267+
w.WriteHeader(http.StatusOK)
268+
}

node/node.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ func New(cfg *Config, version string, logLevel *utils.LogLevel) (*Node, error) {
285285
if cfg.HTTP {
286286
readinessHandlers := NewReadinessHandlers(chain, synchronizer)
287287
httpHandlers := map[string]http.HandlerFunc{
288+
"/live": readinessHandlers.HandleLive,
289+
"/ready": readinessHandlers.HandleReadySync,
288290
"/ready/sync": readinessHandlers.HandleReadySync,
289291
}
290292
services = append(services, makeRPCOverHTTP(cfg.HTTPHost, cfg.HTTPPort, rpcServers, httpHandlers, log, cfg.Metrics, cfg.RPCCorsEnable))
@@ -428,7 +430,13 @@ func (n *Node) Run(ctx context.Context) {
428430
n.StartService(wg, ctx, cancel, s)
429431
}
430432

431-
if err := migration.MigrateIfNeeded(ctx, n.db, &n.cfg.Network, n.log); err != nil {
433+
migrationHTTPConfig := migration.HTTPConfig{
434+
Enabled: n.cfg.HTTP,
435+
Host: n.cfg.HTTPHost,
436+
Port: n.cfg.HTTPPort,
437+
}
438+
439+
if err := migration.MigrateIfNeeded(ctx, n.db, &n.cfg.Network, n.log, &migrationHTTPConfig); err != nil {
432440
if errors.Is(err, context.Canceled) {
433441
n.log.Infow("DB Migration cancelled")
434442
return

0 commit comments

Comments
 (0)